use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use serde_json::Value;
use sqlx::{PgPool, Row};
use uuid::Uuid;
use super::{Result, batch_error, sqlx_error};
use crate::CausalRef;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct VisualizationOptions {
pub include_errors: bool,
pub break_glass: bool,
pub max_events: usize,
}
impl Default for VisualizationOptions {
fn default() -> Self {
Self {
include_errors: false,
break_glass: false,
max_events: 5_000,
}
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct HtmlOptions {
pub title: Option<String>,
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
pub struct VisualizationDocument {
pub provenance: VisualizationProvenance,
pub root_run_id: String,
pub overview: VisualizationOverview,
pub events: Vec<VisualizationEvent>,
pub lanes: Vec<VisualizationLane>,
pub spans: Vec<VisualizationSpan>,
pub workflows: Vec<VisualizationWorkflow>,
pub continuations: Vec<VisualizationContinuation>,
pub provider_batches: Vec<VisualizationProviderBatch>,
pub fork_joins: Vec<VisualizationForkJoin>,
pub warnings: Vec<VisualizationWarning>,
pub attention: Vec<VisualizationAttention>,
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct VisualizationProvenance {
pub generator: String,
pub generator_version: String,
pub generated_at: String,
pub root_run_id: String,
pub include_errors: bool,
pub break_glass: bool,
pub max_events: usize,
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct VisualizationOverview {
pub total_events: usize,
pub run_count: usize,
pub first_created_at: Option<String>,
pub last_created_at: Option<String>,
pub event_family_counts: BTreeMap<String, usize>,
pub workflow_status_counts: BTreeMap<String, usize>,
pub continuation_status_counts: BTreeMap<String, usize>,
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
pub struct VisualizationEvent {
pub event_id: String,
pub root_run_id: String,
pub run_id: String,
pub parent_run_id: Option<String>,
pub fork_name: Option<String>,
pub event_ordinal: i64,
pub caused_by: CausalRef,
pub caused_by_event_id: Option<String>,
pub caused_event_ids: Vec<String>,
pub event_type: String,
pub event_family: String,
pub event_version: i16,
pub continuation_id: Option<String>,
pub label: String,
pub severity: String,
pub attention: Vec<String>,
pub summary: Vec<VisualizationField>,
pub event: Value,
pub created_at: String,
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct VisualizationField {
pub key: String,
pub value: String,
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct VisualizationLane {
pub run_id: String,
pub parent_run_id: Option<String>,
pub fork_name: Option<String>,
pub first_created_at: Option<String>,
pub event_ids: Vec<String>,
pub current_status: Option<String>,
pub warnings: Vec<String>,
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct VisualizationSpan {
pub kind: String,
pub run_id: String,
pub start_event_id: String,
pub end_event_id: Option<String>,
pub label: String,
pub open: bool,
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
pub struct VisualizationWorkflow {
pub run_id: String,
pub root_run_id: String,
pub status: String,
pub parent_run_id: Option<String>,
pub fork_name: Option<String>,
pub error_present: bool,
pub error_sexpr: Option<String>,
pub next_event_ordinal: i64,
pub created_at: String,
pub updated_at: String,
pub quiescent: bool,
pub quiesced_at: Option<String>,
pub workflow: Option<Value>,
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
pub struct VisualizationContinuation {
pub continuation_id: String,
pub workflow_run_id: String,
pub kind: String,
pub status: String,
pub provider: Option<String>,
pub provider_batch_id: Option<String>,
pub provider_message_id: Option<String>,
pub output_key: Option<String>,
pub result_type: Option<String>,
pub error_present: bool,
pub error_sexpr: Option<String>,
pub attempt_of: Option<String>,
pub attempt: i32,
pub input_tokens: Option<i64>,
pub output_tokens: Option<i64>,
pub cache_creation_input_tokens: Option<i64>,
pub cache_read_input_tokens: Option<i64>,
pub created_at: String,
pub updated_at: String,
pub submitted_at: Option<String>,
pub completed_at: Option<String>,
pub quiescent: bool,
pub quiesced_at: Option<String>,
pub request: Option<Value>,
pub response: Option<Value>,
pub usage: Option<Value>,
pub server_tool_use: Option<Value>,
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
pub struct VisualizationProviderBatch {
pub provider: String,
pub provider_batch_id: String,
pub status: String,
pub request_count: i32,
pub error_present: bool,
pub error_sexpr: Option<String>,
pub created_at: String,
pub updated_at: String,
pub submitted_at: String,
pub completed_at: Option<String>,
pub quiescent: bool,
pub quiesced_at: Option<String>,
pub response: Option<Value>,
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct VisualizationForkJoin {
pub parent_run_id: String,
pub lhs_run_id: String,
pub rhs_run_id: String,
pub status: String,
pub error_present: bool,
pub error_sexpr: Option<String>,
pub created_at: String,
pub updated_at: String,
pub quiescent: bool,
pub quiesced_at: Option<String>,
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct VisualizationWarning {
pub code: String,
pub message: String,
pub event_id: Option<String>,
pub run_id: Option<String>,
pub continuation_id: Option<String>,
pub provider_batch_id: Option<String>,
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct VisualizationAttention {
pub kind: String,
pub severity: String,
pub label: String,
pub event_id: Option<String>,
pub run_id: Option<String>,
pub continuation_id: Option<String>,
pub provider_batch_id: Option<String>,
}
pub async fn build_root_workflow_visualization(
pool: &PgPool,
root_run_id: &str,
options: VisualizationOptions,
) -> Result<VisualizationDocument> {
if options.max_events == 0 {
return Err(batch_error(
"visualizer-invalid-max-events",
"max_events must be greater than zero",
));
}
let generated_at = load_generated_at(pool).await?;
let event_count = load_event_count(pool, root_run_id).await?;
if event_count == 0 {
return Err(batch_error(
"visualizer-root-has-no-events",
"root workflow has no ledger events",
)
.with_string_field("root_run_id", root_run_id));
}
if event_count > options.max_events {
return Err(batch_error(
"visualizer-root-too-large",
"root workflow has more events than max_events",
)
.with_string_field("root_run_id", root_run_id)
.with_atom_field("event_count", event_count)
.with_atom_field("max_events", options.max_events));
}
let events = load_events(pool, root_run_id).await?;
let run_ids = unique_run_ids(&events);
let event_continuation_ids = event_continuation_ids(&events);
let workflows = load_workflows(pool, root_run_id, &options).await?;
let continuations =
load_continuations(pool, &run_ids, &event_continuation_ids, &options).await?;
let provider_batch_ids = referenced_provider_batch_ids(&events, &continuations);
let provider_batches = load_provider_batches(pool, &provider_batch_ids, &options).await?;
let fork_joins = load_fork_joins(pool, &run_ids, &options).await?;
Ok(project_visualization(
root_run_id.to_string(),
generated_at,
options,
events,
workflows,
continuations,
provider_batches,
fork_joins,
))
}
pub fn render_visualization_html(
document: &VisualizationDocument,
options: HtmlOptions,
) -> Result<String> {
let title = options
.title
.unwrap_or_else(|| format!("Batch workflow {}", document.root_run_id));
let json = serde_json::to_string_pretty(document).map_err(|err| {
batch_error(
"visualizer-render-json",
"failed to serialize visualization document",
)
.with_string_field("source", &err.to_string())
})?;
let json = escape_json_for_html_script(&json);
let mut html = String::new();
html.push_str("<!doctype html>\n<html lang=\"en\">\n<head>\n<meta charset=\"utf-8\">\n");
html.push_str("<meta name=\"viewport\" content=\"width=device-width,initial-scale=1\">\n");
html.push_str("<title>");
html.push_str(&escape_html_text(&title));
html.push_str("</title>\n");
html.push_str(HTML_STYLE);
html.push_str("</head>\n<body>\n");
html.push_str("<script id=\"visualization-data\" type=\"application/json\">");
html.push_str(&json);
html.push_str("</script>\n");
html.push_str(HTML_BODY);
html.push_str(HTML_SCRIPT);
html.push_str("\n</body>\n</html>\n");
Ok(html)
}
#[derive(Clone, Debug, PartialEq)]
struct RawEvent {
event_id: Uuid,
root_run_id: String,
run_id: String,
parent_run_id: Option<String>,
fork_name: Option<String>,
event_ordinal: i64,
caused_by: CausalRef,
event_type: String,
event_version: i16,
continuation_id: Option<String>,
event: Value,
created_at: String,
}
fn project_visualization(
root_run_id: String,
generated_at: String,
options: VisualizationOptions,
raw_events: Vec<RawEvent>,
workflows: Vec<VisualizationWorkflow>,
continuations: Vec<VisualizationContinuation>,
provider_batches: Vec<VisualizationProviderBatch>,
fork_joins: Vec<VisualizationForkJoin>,
) -> VisualizationDocument {
let mut caused_by_reverse: BTreeMap<String, Vec<String>> = BTreeMap::new();
for event in &raw_events {
if let CausalRef::EventId { event_id } = &event.caused_by {
caused_by_reverse
.entry(event_id.to_string())
.or_default()
.push(event.event_id.to_string());
}
}
let raw_by_event_id = raw_events
.iter()
.map(|event| (event.event_id.to_string(), event))
.collect::<HashMap<_, _>>();
for caused_ids in caused_by_reverse.values_mut() {
caused_ids.sort_by(|lhs, rhs| {
raw_by_event_id
.get(lhs)
.map(|event| (&event.created_at, event.event_ordinal, event.event_id))
.cmp(
&raw_by_event_id
.get(rhs)
.map(|event| (&event.created_at, event.event_ordinal, event.event_id)),
)
});
}
let mut events = raw_events
.iter()
.map(|event| {
let event_id = event.event_id.to_string();
let family = event_family(&event.event_type);
let attention = event_attention(&event.event_type);
VisualizationEvent {
event_id: event_id.clone(),
root_run_id: event.root_run_id.clone(),
run_id: event.run_id.clone(),
parent_run_id: event.parent_run_id.clone(),
fork_name: event.fork_name.clone(),
event_ordinal: event.event_ordinal,
caused_by: event.caused_by.clone(),
caused_by_event_id: caused_by_event_id(&event.caused_by),
caused_event_ids: caused_by_reverse.remove(&event_id).unwrap_or_default(),
event_type: event.event_type.clone(),
event_family: family,
event_version: event.event_version,
continuation_id: event.continuation_id.clone(),
label: event_label(&event.event_type, &event.event),
severity: event_severity(&event.event_type),
attention,
summary: event_summary(&event.event_type, &event.event),
event: event.event.clone(),
created_at: event.created_at.clone(),
}
})
.collect::<Vec<_>>();
events.sort_by(|lhs, rhs| {
lhs.created_at
.cmp(&rhs.created_at)
.then_with(|| lhs.event_id.cmp(&rhs.event_id))
});
let workflow_by_run = workflows
.iter()
.map(|workflow| (workflow.run_id.clone(), workflow))
.collect::<HashMap<_, _>>();
let continuation_by_id = continuations
.iter()
.map(|continuation| (continuation.continuation_id.clone(), continuation))
.collect::<HashMap<_, _>>();
let provider_batch_by_id = provider_batches
.iter()
.map(|batch| (batch.provider_batch_id.clone(), batch))
.collect::<HashMap<_, _>>();
let mut lane_builders = build_lane_builders(&raw_events, &workflow_by_run);
let warnings = build_warnings(
&root_run_id,
&raw_events,
&lane_builders,
&workflow_by_run,
&continuation_by_id,
&provider_batch_by_id,
);
attach_lane_warnings(&mut lane_builders, &warnings);
let lanes = order_lanes(&root_run_id, lane_builders);
let spans = derive_spans(&events);
let attention = build_attention(
&events,
&workflows,
&continuations,
&provider_batches,
&fork_joins,
);
let overview = build_overview(&events, &lanes, &workflows, &continuations);
VisualizationDocument {
provenance: VisualizationProvenance {
generator: "langcontinuation batch visualizer".to_string(),
generator_version: env!("CARGO_PKG_VERSION").to_string(),
generated_at: generated_at.clone(),
root_run_id: root_run_id.clone(),
include_errors: options.include_errors || options.break_glass,
break_glass: options.break_glass,
max_events: options.max_events,
},
root_run_id,
overview,
events,
lanes,
spans,
workflows,
continuations,
provider_batches,
fork_joins,
warnings,
attention,
}
}
#[derive(Clone, Debug)]
struct LaneBuilder {
run_id: String,
parent_run_id: Option<String>,
fork_name: Option<String>,
first_created_at: Option<String>,
event_ids: Vec<String>,
current_status: Option<String>,
warnings: Vec<String>,
}
fn build_lane_builders(
raw_events: &[RawEvent],
workflow_by_run: &HashMap<String, &VisualizationWorkflow>,
) -> BTreeMap<String, LaneBuilder> {
let mut builders = BTreeMap::<String, LaneBuilder>::new();
let mut events_by_run = BTreeMap::<String, Vec<&RawEvent>>::new();
for event in raw_events {
events_by_run
.entry(event.run_id.clone())
.or_default()
.push(event);
}
for (run_id, events) in &mut events_by_run {
events.sort_by(|lhs, rhs| {
lhs.event_ordinal
.cmp(&rhs.event_ordinal)
.then_with(|| lhs.created_at.cmp(&rhs.created_at))
.then_with(|| lhs.event_id.cmp(&rhs.event_id))
});
let workflow = workflow_by_run.get(run_id);
let parent_run_id = workflow
.and_then(|workflow| workflow.parent_run_id.clone())
.or_else(|| events.iter().find_map(|event| event.parent_run_id.clone()));
let fork_name = workflow
.and_then(|workflow| workflow.fork_name.clone())
.or_else(|| events.iter().find_map(|event| event.fork_name.clone()));
builders.insert(
run_id.clone(),
LaneBuilder {
run_id: run_id.clone(),
parent_run_id,
fork_name,
first_created_at: events.first().map(|event| event.created_at.clone()),
event_ids: events
.iter()
.map(|event| event.event_id.to_string())
.collect(),
current_status: workflow.map(|workflow| workflow.status.clone()),
warnings: Vec::new(),
},
);
}
builders
}
fn attach_lane_warnings(
lane_builders: &mut BTreeMap<String, LaneBuilder>,
warnings: &[VisualizationWarning],
) {
for warning in warnings {
if let Some(run_id) = &warning.run_id {
if let Some(lane) = lane_builders.get_mut(run_id) {
lane.warnings.push(warning.message.clone());
}
}
}
}
fn order_lanes(
root_run_id: &str,
lane_builders: BTreeMap<String, LaneBuilder>,
) -> Vec<VisualizationLane> {
let mut children = BTreeMap::<String, Vec<String>>::new();
for lane in lane_builders.values() {
if let Some(parent) = &lane.parent_run_id {
children
.entry(parent.clone())
.or_default()
.push(lane.run_id.clone());
}
}
for child_runs in children.values_mut() {
child_runs.sort_by(|lhs, rhs| {
compare_lanes(
lane_builders.get(lhs),
lane_builders.get(rhs),
lhs.as_str(),
rhs.as_str(),
)
});
}
let mut ordered = Vec::new();
let mut seen = HashSet::<String>::new();
visit_lane(
root_run_id,
&lane_builders,
&children,
&mut seen,
&mut ordered,
);
let mut remaining = lane_builders
.keys()
.filter(|run_id| !seen.contains(*run_id))
.cloned()
.collect::<Vec<_>>();
remaining.sort_by(|lhs, rhs| {
compare_lanes(
lane_builders.get(lhs),
lane_builders.get(rhs),
lhs.as_str(),
rhs.as_str(),
)
});
for run_id in remaining {
visit_lane(&run_id, &lane_builders, &children, &mut seen, &mut ordered);
}
ordered
}
fn visit_lane(
run_id: &str,
lane_builders: &BTreeMap<String, LaneBuilder>,
children: &BTreeMap<String, Vec<String>>,
seen: &mut HashSet<String>,
ordered: &mut Vec<VisualizationLane>,
) {
if !seen.insert(run_id.to_string()) {
return;
}
if let Some(lane) = lane_builders.get(run_id) {
ordered.push(VisualizationLane {
run_id: lane.run_id.clone(),
parent_run_id: lane.parent_run_id.clone(),
fork_name: lane.fork_name.clone(),
first_created_at: lane.first_created_at.clone(),
event_ids: lane.event_ids.clone(),
current_status: lane.current_status.clone(),
warnings: lane.warnings.clone(),
});
}
if let Some(child_runs) = children.get(run_id) {
for child_run in child_runs {
visit_lane(child_run, lane_builders, children, seen, ordered);
}
}
}
fn compare_lanes(
lhs: Option<&LaneBuilder>,
rhs: Option<&LaneBuilder>,
lhs_run_id: &str,
rhs_run_id: &str,
) -> std::cmp::Ordering {
fork_sort_key(lhs.and_then(|lane| lane.fork_name.as_deref()))
.cmp(&fork_sort_key(
rhs.and_then(|lane| lane.fork_name.as_deref()),
))
.then_with(|| {
lhs.and_then(|lane| lane.first_created_at.as_ref())
.cmp(&rhs.and_then(|lane| lane.first_created_at.as_ref()))
})
.then_with(|| lhs_run_id.cmp(rhs_run_id))
}
fn fork_sort_key(fork_name: Option<&str>) -> (u8, &str) {
match fork_name {
Some("lhs") => (0, "lhs"),
Some("rhs") => (1, "rhs"),
Some(name) => (2, name),
None => (3, ""),
}
}
fn derive_spans(events: &[VisualizationEvent]) -> Vec<VisualizationSpan> {
let mut events_by_run = BTreeMap::<String, Vec<&VisualizationEvent>>::new();
for event in events {
events_by_run
.entry(event.run_id.clone())
.or_default()
.push(event);
}
let mut spans = Vec::new();
for (run_id, mut run_events) in events_by_run {
run_events.sort_by_key(|event| event.event_ordinal);
let mut open = BTreeMap::<String, &VisualizationEvent>::new();
for event in run_events {
for key in span_start_keys(event) {
open.insert(key, event);
}
for (key, kind) in span_end_keys(event) {
if let Some(start) = open.remove(&key) {
spans.push(VisualizationSpan {
kind,
run_id: run_id.clone(),
start_event_id: start.event_id.clone(),
end_event_id: Some(event.event_id.clone()),
label: span_label(start, event),
open: false,
});
}
}
}
for (key, start) in open {
spans.push(VisualizationSpan {
kind: key,
run_id: run_id.clone(),
start_event_id: start.event_id.clone(),
end_event_id: None,
label: format!("{} is open", start.event_type),
open: true,
});
}
}
spans
}
fn span_start_keys(event: &VisualizationEvent) -> Vec<String> {
let continuation = event
.continuation_id
.clone()
.unwrap_or_else(|| "none".to_string());
match event.event_type.as_str() {
"local_call.started" => vec!["local_call".to_string()],
"tool_call.started" => vec!["tool_call".to_string()],
"fork_join.started" => vec!["fork_join".to_string()],
"human.blocked" => vec![format!("human:{continuation}")],
"openai.blocked" => vec![format!("openai:{continuation}")],
"continuation.blocked" => vec![format!("continuation:{continuation}")],
"anthropic.suspended" => vec![format!("anthropic:{continuation}")],
"anthropic.submitted" => vec![format!("anthropic_provider:{continuation}")],
_ => Vec::new(),
}
}
fn span_end_keys(event: &VisualizationEvent) -> Vec<(String, String)> {
let continuation = event
.continuation_id
.clone()
.unwrap_or_else(|| "none".to_string());
match event.event_type.as_str() {
"local_call.completed" | "local_call.failed" => {
vec![("local_call".to_string(), "local_call".to_string())]
}
"tool_call.completed" | "tool_call.failed" => {
vec![("tool_call".to_string(), "tool_call".to_string())]
}
"fork_join.completed" | "fork_join.failed" => {
vec![("fork_join".to_string(), "fork_join".to_string())]
}
"human.resumed" => vec![(format!("human:{continuation}"), "human".to_string())],
"openai.resumed" => vec![(format!("openai:{continuation}"), "openai".to_string())],
"anthropic.resumed" => vec![(format!("anthropic:{continuation}"), "anthropic".to_string())],
"anthropic.completed" | "anthropic.failed" => vec![(
format!("anthropic_provider:{continuation}"),
"anthropic_provider".to_string(),
)],
_ => Vec::new(),
}
}
fn span_label(start: &VisualizationEvent, end: &VisualizationEvent) -> String {
format!("{} to {}", start.event_type, end.event_type)
}
fn build_warnings(
root_run_id: &str,
raw_events: &[RawEvent],
lane_builders: &BTreeMap<String, LaneBuilder>,
workflow_by_run: &HashMap<String, &VisualizationWorkflow>,
continuation_by_id: &HashMap<String, &VisualizationContinuation>,
provider_batch_by_id: &HashMap<String, &VisualizationProviderBatch>,
) -> Vec<VisualizationWarning> {
let mut warnings = Vec::new();
let mut ordinals = HashSet::<(String, i64)>::new();
let event_ids = raw_events
.iter()
.map(|event| event.event_id)
.collect::<HashSet<_>>();
for event in raw_events {
if event.root_run_id != root_run_id {
warnings.push(warning(
"event-root-mismatch",
format!(
"event {} has root_run_id {} but artifact root is {}",
event.event_id, event.root_run_id, root_run_id
),
Some(event.event_id.to_string()),
Some(event.run_id.clone()),
None,
None,
));
}
if !ordinals.insert((event.run_id.clone(), event.event_ordinal)) {
warnings.push(warning(
"duplicate-event-ordinal",
format!(
"run {} has duplicate event ordinal {}",
event.run_id, event.event_ordinal
),
Some(event.event_id.to_string()),
Some(event.run_id.clone()),
None,
None,
));
}
if let CausalRef::EventId { event_id } = &event.caused_by {
if !event_ids.contains(event_id) {
warnings.push(warning(
"unknown-causal-event",
format!(
"event {} references missing cause {}",
event.event_id, event_id
),
Some(event.event_id.to_string()),
Some(event.run_id.clone()),
None,
None,
));
}
}
if let Some(continuation_id) = &event.continuation_id {
if !continuation_by_id.contains_key(continuation_id) {
warnings.push(warning(
"missing-continuation-row",
format!(
"event {} references missing continuation {}",
event.event_id, continuation_id
),
Some(event.event_id.to_string()),
Some(event.run_id.clone()),
Some(continuation_id.clone()),
None,
));
}
}
for provider_batch_id in provider_batch_ids_in_value(&event.event) {
if !provider_batch_by_id.contains_key(&provider_batch_id) {
warnings.push(warning(
"missing-provider-batch-row",
format!(
"event {} references missing provider batch {}",
event.event_id, provider_batch_id
),
Some(event.event_id.to_string()),
Some(event.run_id.clone()),
None,
Some(provider_batch_id),
));
}
}
}
for run_id in lane_builders.keys() {
if !workflow_by_run.contains_key(run_id) {
warnings.push(warning(
"missing-workflow-row",
format!("run {run_id} has events but no current workflow row"),
None,
Some(run_id.clone()),
None,
None,
));
}
}
warnings
}
fn warning(
code: impl Into<String>,
message: impl Into<String>,
event_id: Option<String>,
run_id: Option<String>,
continuation_id: Option<String>,
provider_batch_id: Option<String>,
) -> VisualizationWarning {
VisualizationWarning {
code: code.into(),
message: message.into(),
event_id,
run_id,
continuation_id,
provider_batch_id,
}
}
fn build_attention(
events: &[VisualizationEvent],
workflows: &[VisualizationWorkflow],
continuations: &[VisualizationContinuation],
provider_batches: &[VisualizationProviderBatch],
fork_joins: &[VisualizationForkJoin],
) -> Vec<VisualizationAttention> {
let mut attention = Vec::new();
for event in events {
for category in &event.attention {
attention.push(VisualizationAttention {
kind: "event".to_string(),
severity: category.clone(),
label: format!(
"{} #{} {}",
event.run_id, event.event_ordinal, event.event_type
),
event_id: Some(event.event_id.clone()),
run_id: Some(event.run_id.clone()),
continuation_id: event.continuation_id.clone(),
provider_batch_id: provider_batch_id_in_value(&event.event),
});
}
}
for workflow in workflows {
if matches!(
workflow.status.as_str(),
"failed"
| "waiting_anthropic"
| "blocked_human"
| "blocked_openai"
| "waiting_fork_join"
) {
attention.push(VisualizationAttention {
kind: "workflow".to_string(),
severity: status_severity(&workflow.status).to_string(),
label: format!("workflow {} is {}", workflow.run_id, workflow.status),
event_id: None,
run_id: Some(workflow.run_id.clone()),
continuation_id: None,
provider_batch_id: None,
});
}
}
for continuation in continuations {
if matches!(
continuation.status.as_str(),
"pending" | "submitted" | "blocked" | "failed"
) {
attention.push(VisualizationAttention {
kind: "continuation".to_string(),
severity: status_severity(&continuation.status).to_string(),
label: format!(
"continuation {} is {}",
continuation.continuation_id, continuation.status
),
event_id: None,
run_id: Some(continuation.workflow_run_id.clone()),
continuation_id: Some(continuation.continuation_id.clone()),
provider_batch_id: continuation.provider_batch_id.clone(),
});
}
}
for provider_batch in provider_batches {
if matches!(provider_batch.status.as_str(), "in_progress" | "canceling") {
attention.push(VisualizationAttention {
kind: "provider_batch".to_string(),
severity: "waiting".to_string(),
label: format!(
"provider batch {} is {}",
provider_batch.provider_batch_id, provider_batch.status
),
event_id: None,
run_id: None,
continuation_id: None,
provider_batch_id: Some(provider_batch.provider_batch_id.clone()),
});
}
}
for fork_join in fork_joins {
if matches!(fork_join.status.as_str(), "waiting" | "failed") {
attention.push(VisualizationAttention {
kind: "fork_join".to_string(),
severity: status_severity(&fork_join.status).to_string(),
label: format!(
"fork join {} is {}",
fork_join.parent_run_id, fork_join.status
),
event_id: None,
run_id: Some(fork_join.parent_run_id.clone()),
continuation_id: None,
provider_batch_id: None,
});
}
}
attention
}
fn build_overview(
events: &[VisualizationEvent],
lanes: &[VisualizationLane],
workflows: &[VisualizationWorkflow],
continuations: &[VisualizationContinuation],
) -> VisualizationOverview {
let mut event_family_counts = BTreeMap::new();
for event in events {
*event_family_counts
.entry(event.event_family.clone())
.or_insert(0) += 1;
}
let mut workflow_status_counts = BTreeMap::new();
for workflow in workflows {
*workflow_status_counts
.entry(workflow.status.clone())
.or_insert(0) += 1;
}
let mut continuation_status_counts = BTreeMap::new();
for continuation in continuations {
*continuation_status_counts
.entry(continuation.status.clone())
.or_insert(0) += 1;
}
VisualizationOverview {
total_events: events.len(),
run_count: lanes.len(),
first_created_at: events.first().map(|event| event.created_at.clone()),
last_created_at: events.last().map(|event| event.created_at.clone()),
event_family_counts,
workflow_status_counts,
continuation_status_counts,
}
}
fn event_family(event_type: &str) -> String {
let family = event_type.split('.').next().unwrap_or("custom");
match family {
"workflow" | "local_call" | "anthropic" | "openai" | "human" | "tool_call"
| "fork_join" | "continuation" => family.to_string(),
_ => "custom".to_string(),
}
}
fn event_attention(event_type: &str) -> Vec<String> {
let mut attention = Vec::new();
if event_type.ends_with(".failed") || event_type == "workflow.failed" {
attention.push("failed".to_string());
}
if event_type.ends_with(".blocked") {
attention.push("blocked".to_string());
}
if event_type.ends_with(".retried") {
attention.push("retried".to_string());
}
if matches!(
event_type,
"anthropic.suspended"
| "anthropic.submitted"
| "human.blocked"
| "openai.blocked"
| "continuation.blocked"
| "fork_join.started"
) {
attention.push("waiting".to_string());
}
attention
}
fn event_severity(event_type: &str) -> String {
if event_type.ends_with(".failed") || event_type == "workflow.failed" {
"failed"
} else if event_type.ends_with(".blocked") {
"blocked"
} else if event_type.ends_with(".retried") {
"retried"
} else if matches!(
event_type,
"anthropic.suspended" | "anthropic.submitted" | "fork_join.started"
) {
"waiting"
} else if event_type.ends_with(".completed")
|| event_type.ends_with(".resumed")
|| event_type == "workflow.halted"
{
"success"
} else {
"normal"
}
.to_string()
}
fn status_severity(status: &str) -> &str {
match status {
"failed" => "failed",
"blocked" | "blocked_human" | "blocked_openai" => "blocked",
"pending" | "submitted" | "waiting" | "waiting_anthropic" | "waiting_fork_join"
| "in_progress" | "canceling" => "waiting",
_ => "normal",
}
}
fn event_label(event_type: &str, payload: &Value) -> String {
match event_type {
"workflow.enqueued" => string_field(payload, "kind")
.map(|kind| format!("enqueued {kind}"))
.unwrap_or_else(|| "enqueued".to_string()),
"workflow.halted" => "halted".to_string(),
"workflow.failed" => string_field(payload, "reason")
.map(|reason| format!("failed: {reason}"))
.unwrap_or_else(|| "failed".to_string()),
"local_call.started" => string_field(payload, "function")
.map(|function| format!("start {function}"))
.unwrap_or_else(|| "local call started".to_string()),
"local_call.completed" => string_field(payload, "function")
.map(|function| format!("completed {function}"))
.unwrap_or_else(|| "local call completed".to_string()),
"local_call.failed" => string_field(payload, "function")
.map(|function| format!("failed {function}"))
.unwrap_or_else(|| "local call failed".to_string()),
"anthropic.suspended" => string_field(payload, "provider")
.map(|provider| format!("suspended for {provider}"))
.unwrap_or_else(|| "anthropic suspended".to_string()),
"anthropic.submitted" => string_field(payload, "provider_batch_id")
.map(|batch| format!("submitted to {batch}"))
.unwrap_or_else(|| "anthropic submitted".to_string()),
"anthropic.completed" => "anthropic completed".to_string(),
"anthropic.resumed" => string_field(payload, "output_key")
.map(|output| format!("resumed into {output}"))
.unwrap_or_else(|| "anthropic resumed".to_string()),
"anthropic.failed" => "anthropic failed".to_string(),
"anthropic.retried" => string_field(payload, "previous_continuation_id")
.map(|id| format!("retry {id}"))
.unwrap_or_else(|| "anthropic retried".to_string()),
"human.blocked" => "human blocked".to_string(),
"human.resumed" => "human resumed".to_string(),
"openai.blocked" => "openai blocked".to_string(),
"openai.resumed" => "openai resumed".to_string(),
"tool_call.started" => "tool calls started".to_string(),
"tool_call.completed" => "tool calls completed".to_string(),
"tool_call.failed" => string_field(payload, "reason")
.map(|reason| format!("tool calls failed: {reason}"))
.unwrap_or_else(|| "tool calls failed".to_string()),
"fork_join.started" => "fork started".to_string(),
"fork_join.completed" => "fork joined".to_string(),
"fork_join.failed" => "fork failed".to_string(),
_ => event_type.to_string(),
}
}
fn event_summary(event_type: &str, payload: &Value) -> Vec<VisualizationField> {
let mut fields = Vec::new();
match event_type {
"workflow.enqueued" => {
push_field(&mut fields, payload, "kind");
push_json_field(&mut fields, payload, "next_action");
push_field(&mut fields, payload, "parent_run_id");
push_field(&mut fields, payload, "fork_name");
}
"workflow.halted" => {
push_field(&mut fields, payload, "env_key_count");
}
"workflow.failed" => {
push_field(&mut fields, payload, "reason");
push_json_field(&mut fields, payload, "error_ref");
}
"local_call.started" => {
push_field(&mut fields, payload, "function");
}
"local_call.completed" | "local_call.failed" => {
push_field(&mut fields, payload, "function");
push_field(&mut fields, payload, "duration_ms");
push_json_field(&mut fields, payload, "env_changes");
push_json_field(&mut fields, payload, "flow");
push_json_field(&mut fields, payload, "error");
}
"anthropic.suspended" => {
push_field(&mut fields, payload, "provider");
push_field(&mut fields, payload, "output_key");
push_field(&mut fields, payload, "continuation_id");
push_json_field(&mut fields, payload, "request_ref");
}
"anthropic.submitted" => {
push_field(&mut fields, payload, "continuation_id");
push_field(&mut fields, payload, "provider_batch_id");
}
"anthropic.completed" => {
push_field(&mut fields, payload, "continuation_id");
push_field(&mut fields, payload, "provider_batch_id");
push_field(&mut fields, payload, "provider_message_id");
push_field(&mut fields, payload, "attempt");
push_usage_summary(&mut fields, payload);
push_json_field(&mut fields, payload, "response_ref");
}
"anthropic.resumed" => {
push_field(&mut fields, payload, "continuation_id");
push_field(&mut fields, payload, "output_key");
}
"anthropic.failed" => {
push_field(&mut fields, payload, "continuation_id");
push_field(&mut fields, payload, "provider_batch_id");
push_field(&mut fields, payload, "attempt");
push_field(&mut fields, payload, "result_type");
push_json_field(&mut fields, payload, "error_ref");
}
"anthropic.retried" => {
push_field(&mut fields, payload, "previous_continuation_id");
push_field(&mut fields, payload, "new_continuation_id");
push_field(&mut fields, payload, "attempt");
push_field(&mut fields, payload, "provider");
}
"human.blocked" | "openai.blocked" | "continuation.blocked" => {
push_field(&mut fields, payload, "kind");
push_field(&mut fields, payload, "output_key");
push_field(&mut fields, payload, "continuation_id");
}
"human.resumed" | "openai.resumed" => {
push_field(&mut fields, payload, "continuation_id");
push_json_field(&mut fields, payload, "response_ref");
}
"tool_call.started" => {
push_json_field(&mut fields, payload, "tool_names");
push_json_field(&mut fields, payload, "tool_call_ids");
}
"tool_call.completed" => {
push_field(&mut fields, payload, "tool_count");
push_field(&mut fields, payload, "result_error_count");
}
"tool_call.failed" => {
push_field(&mut fields, payload, "reason");
push_field(&mut fields, payload, "source");
}
"fork_join.started" | "fork_join.completed" => {
push_json_field(&mut fields, payload, "branch_run_id");
push_json_field(&mut fields, payload, "terminal_event_id");
push_field(&mut fields, payload, "join_function");
}
"fork_join.failed" => {
push_json_field(&mut fields, payload, "error_ref");
}
_ => push_generic_summary(&mut fields, payload),
}
fields
}
fn push_usage_summary(fields: &mut Vec<VisualizationField>, payload: &Value) {
let Some(usage) = payload.get("usage") else {
return;
};
for key in [
"input_tokens",
"output_tokens",
"cache_creation_input_tokens",
"cache_read_input_tokens",
] {
if let Some(value) = usage.get(key).and_then(display_value) {
fields.push(VisualizationField {
key: format!("usage.{key}"),
value,
});
}
}
}
fn push_generic_summary(fields: &mut Vec<VisualizationField>, payload: &Value) {
let Some(object) = payload.as_object() else {
return;
};
for (key, value) in object.iter().take(6) {
if let Some(value) = display_value(value) {
fields.push(VisualizationField {
key: key.clone(),
value,
});
}
}
}
fn push_field(fields: &mut Vec<VisualizationField>, payload: &Value, key: &str) {
if let Some(value) = payload.get(key).and_then(display_value) {
fields.push(VisualizationField {
key: key.to_string(),
value,
});
}
}
fn push_json_field(fields: &mut Vec<VisualizationField>, payload: &Value, key: &str) {
if let Some(value) = payload.get(key) {
fields.push(VisualizationField {
key: key.to_string(),
value: compact_json(value),
});
}
}
fn string_field(payload: &Value, key: &str) -> Option<String> {
payload
.get(key)
.and_then(Value::as_str)
.map(ToString::to_string)
}
fn display_value(value: &Value) -> Option<String> {
match value {
Value::Null => None,
Value::Bool(value) => Some(value.to_string()),
Value::Number(value) => Some(value.to_string()),
Value::String(value) => Some(value.clone()),
Value::Array(_) | Value::Object(_) => Some(compact_json(value)),
}
}
fn compact_json(value: &Value) -> String {
serde_json::to_string(value).unwrap_or_else(|_| "<invalid-json>".to_string())
}
fn caused_by_event_id(caused_by: &CausalRef) -> Option<String> {
match caused_by {
CausalRef::RunId { .. } => None,
CausalRef::EventId { event_id } => Some(event_id.to_string()),
}
}
fn unique_run_ids(events: &[RawEvent]) -> Vec<String> {
events
.iter()
.map(|event| event.run_id.clone())
.collect::<BTreeSet<_>>()
.into_iter()
.collect()
}
fn event_continuation_ids(events: &[RawEvent]) -> Vec<String> {
events
.iter()
.filter_map(|event| event.continuation_id.clone())
.collect::<BTreeSet<_>>()
.into_iter()
.collect()
}
fn referenced_provider_batch_ids(
events: &[RawEvent],
continuations: &[VisualizationContinuation],
) -> Vec<String> {
let mut ids = BTreeSet::new();
for event in events {
ids.extend(provider_batch_ids_in_value(&event.event));
}
for continuation in continuations {
if let Some(provider_batch_id) = &continuation.provider_batch_id {
ids.insert(provider_batch_id.clone());
}
}
ids.into_iter().collect()
}
fn provider_batch_id_in_value(value: &Value) -> Option<String> {
provider_batch_ids_in_value(value).into_iter().next()
}
fn provider_batch_ids_in_value(value: &Value) -> BTreeSet<String> {
let mut ids = BTreeSet::new();
collect_provider_batch_ids(value, &mut ids);
ids
}
fn collect_provider_batch_ids(value: &Value, ids: &mut BTreeSet<String>) {
match value {
Value::Object(object) => {
for (key, value) in object {
if key == "provider_batch_id" {
if let Some(value) = value.as_str() {
ids.insert(value.to_string());
}
}
collect_provider_batch_ids(value, ids);
}
}
Value::Array(values) => {
for value in values {
collect_provider_batch_ids(value, ids);
}
}
Value::Null | Value::Bool(_) | Value::Number(_) | Value::String(_) => {}
}
}
async fn load_generated_at(pool: &PgPool) -> Result<String> {
let row = sqlx::query("SELECT now()::TEXT AS generated_at")
.fetch_one(pool)
.await
.map_err(|err| sqlx_error("visualizer-generated-at", err))?;
row.try_get("generated_at")
.map_err(|err| sqlx_error("visualizer-generated-at", err))
}
async fn load_event_count(pool: &PgPool, root_run_id: &str) -> Result<usize> {
let row =
sqlx::query("SELECT count(*) AS count FROM batch_workflow_events WHERE root_run_id = $1")
.bind(root_run_id)
.fetch_one(pool)
.await
.map_err(|err| sqlx_error("visualizer-count-events", err))?;
let count: i64 = row
.try_get("count")
.map_err(|err| sqlx_error("visualizer-count-events", err))?;
usize::try_from(count).map_err(|_| {
batch_error(
"visualizer-event-count-overflow",
"event count does not fit in usize",
)
.with_atom_field("event_count", count)
})
}
async fn load_events(pool: &PgPool, root_run_id: &str) -> Result<Vec<RawEvent>> {
let rows = sqlx::query(
"SELECT event_id, root_run_id, run_id, parent_run_id, fork_name, \
event_ordinal, caused_by, event_type, event_version, \
continuation_id, event, created_at::TEXT AS created_at \
FROM batch_workflow_events \
WHERE root_run_id = $1 ORDER BY created_at, event_id",
)
.bind(root_run_id)
.fetch_all(pool)
.await
.map_err(|err| sqlx_error("visualizer-load-events", err))?;
rows.into_iter().map(raw_event_from_row).collect()
}
fn raw_event_from_row(row: sqlx::postgres::PgRow) -> Result<RawEvent> {
let caused_by: Value = row
.try_get("caused_by")
.map_err(|err| sqlx_error("visualizer-event-row", err))?;
Ok(RawEvent {
event_id: row
.try_get("event_id")
.map_err(|err| sqlx_error("visualizer-event-row", err))?,
root_run_id: row
.try_get("root_run_id")
.map_err(|err| sqlx_error("visualizer-event-row", err))?,
run_id: row
.try_get("run_id")
.map_err(|err| sqlx_error("visualizer-event-row", err))?,
parent_run_id: row
.try_get("parent_run_id")
.map_err(|err| sqlx_error("visualizer-event-row", err))?,
fork_name: row
.try_get("fork_name")
.map_err(|err| sqlx_error("visualizer-event-row", err))?,
event_ordinal: row
.try_get("event_ordinal")
.map_err(|err| sqlx_error("visualizer-event-row", err))?,
caused_by: serde_json::from_value(caused_by).map_err(|err| {
batch_error(
"visualizer-causal-ref-json",
"failed to decode event causal reference",
)
.with_string_field("source", &err.to_string())
})?,
event_type: row
.try_get("event_type")
.map_err(|err| sqlx_error("visualizer-event-row", err))?,
event_version: row
.try_get("event_version")
.map_err(|err| sqlx_error("visualizer-event-row", err))?,
continuation_id: row
.try_get("continuation_id")
.map_err(|err| sqlx_error("visualizer-event-row", err))?,
event: row
.try_get("event")
.map_err(|err| sqlx_error("visualizer-event-row", err))?,
created_at: row
.try_get("created_at")
.map_err(|err| sqlx_error("visualizer-event-row", err))?,
})
}
async fn load_workflows(
pool: &PgPool,
root_run_id: &str,
options: &VisualizationOptions,
) -> Result<Vec<VisualizationWorkflow>> {
let rows = sqlx::query(
"SELECT run_id, root_run_id, status, parent_run_id, fork_name, \
error_sexpr IS NOT NULL AS error_present, \
CASE WHEN $2 THEN error_sexpr ELSE NULL END AS error_sexpr, \
next_event_ordinal, created_at::TEXT AS created_at, \
updated_at::TEXT AS updated_at, quiescent, \
quiesced_at::TEXT AS quiesced_at, \
CASE WHEN $3 THEN workflow ELSE NULL::jsonb END AS workflow \
FROM batch_workflows WHERE root_run_id = $1 ORDER BY created_at, id",
)
.bind(root_run_id)
.bind(options.include_errors || options.break_glass)
.bind(options.break_glass)
.fetch_all(pool)
.await
.map_err(|err| sqlx_error("visualizer-load-workflows", err))?;
rows.into_iter().map(workflow_from_row).collect()
}
fn workflow_from_row(row: sqlx::postgres::PgRow) -> Result<VisualizationWorkflow> {
Ok(VisualizationWorkflow {
run_id: row
.try_get("run_id")
.map_err(|err| sqlx_error("visualizer-workflow-row", err))?,
root_run_id: row
.try_get("root_run_id")
.map_err(|err| sqlx_error("visualizer-workflow-row", err))?,
status: row
.try_get("status")
.map_err(|err| sqlx_error("visualizer-workflow-row", err))?,
parent_run_id: row
.try_get("parent_run_id")
.map_err(|err| sqlx_error("visualizer-workflow-row", err))?,
fork_name: row
.try_get("fork_name")
.map_err(|err| sqlx_error("visualizer-workflow-row", err))?,
error_present: row
.try_get("error_present")
.map_err(|err| sqlx_error("visualizer-workflow-row", err))?,
error_sexpr: row
.try_get("error_sexpr")
.map_err(|err| sqlx_error("visualizer-workflow-row", err))?,
next_event_ordinal: row
.try_get("next_event_ordinal")
.map_err(|err| sqlx_error("visualizer-workflow-row", err))?,
created_at: row
.try_get("created_at")
.map_err(|err| sqlx_error("visualizer-workflow-row", err))?,
updated_at: row
.try_get("updated_at")
.map_err(|err| sqlx_error("visualizer-workflow-row", err))?,
quiescent: row
.try_get("quiescent")
.map_err(|err| sqlx_error("visualizer-workflow-row", err))?,
quiesced_at: row
.try_get("quiesced_at")
.map_err(|err| sqlx_error("visualizer-workflow-row", err))?,
workflow: row
.try_get("workflow")
.map_err(|err| sqlx_error("visualizer-workflow-row", err))?,
})
}
async fn load_continuations(
pool: &PgPool,
run_ids: &[String],
continuation_ids: &[String],
options: &VisualizationOptions,
) -> Result<Vec<VisualizationContinuation>> {
let rows = sqlx::query(
"SELECT continuation_id, workflow_run_id, kind, status, provider, \
provider_batch_id, provider_message_id, output_key, result_type, \
error_sexpr IS NOT NULL AS error_present, \
CASE WHEN $3 THEN error_sexpr ELSE NULL END AS error_sexpr, \
attempt_of, attempt, input_tokens, output_tokens, \
cache_creation_input_tokens, cache_read_input_tokens, \
created_at::TEXT AS created_at, updated_at::TEXT AS updated_at, \
submitted_at::TEXT AS submitted_at, completed_at::TEXT AS completed_at, \
quiescent, quiesced_at::TEXT AS quiesced_at, \
CASE WHEN $4 THEN request ELSE NULL::jsonb END AS request, \
CASE WHEN $4 THEN response ELSE NULL::jsonb END AS response, \
CASE WHEN $4 THEN usage ELSE NULL::jsonb END AS usage, \
CASE WHEN $4 THEN server_tool_use ELSE NULL::jsonb END AS server_tool_use \
FROM batch_continuations \
WHERE workflow_run_id = ANY($1::text[]) OR continuation_id = ANY($2::text[]) \
ORDER BY created_at, id",
)
.bind(run_ids)
.bind(continuation_ids)
.bind(options.include_errors || options.break_glass)
.bind(options.break_glass)
.fetch_all(pool)
.await
.map_err(|err| sqlx_error("visualizer-load-continuations", err))?;
rows.into_iter().map(continuation_from_row).collect()
}
fn continuation_from_row(row: sqlx::postgres::PgRow) -> Result<VisualizationContinuation> {
Ok(VisualizationContinuation {
continuation_id: row
.try_get("continuation_id")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
workflow_run_id: row
.try_get("workflow_run_id")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
kind: row
.try_get("kind")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
status: row
.try_get("status")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
provider: row
.try_get("provider")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
provider_batch_id: row
.try_get("provider_batch_id")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
provider_message_id: row
.try_get("provider_message_id")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
output_key: row
.try_get("output_key")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
result_type: row
.try_get("result_type")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
error_present: row
.try_get("error_present")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
error_sexpr: row
.try_get("error_sexpr")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
attempt_of: row
.try_get("attempt_of")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
attempt: row
.try_get("attempt")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
input_tokens: row
.try_get("input_tokens")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
output_tokens: row
.try_get("output_tokens")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
cache_creation_input_tokens: row
.try_get("cache_creation_input_tokens")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
cache_read_input_tokens: row
.try_get("cache_read_input_tokens")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
created_at: row
.try_get("created_at")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
updated_at: row
.try_get("updated_at")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
submitted_at: row
.try_get("submitted_at")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
completed_at: row
.try_get("completed_at")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
quiescent: row
.try_get("quiescent")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
quiesced_at: row
.try_get("quiesced_at")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
request: row
.try_get("request")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
response: row
.try_get("response")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
usage: row
.try_get("usage")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
server_tool_use: row
.try_get("server_tool_use")
.map_err(|err| sqlx_error("visualizer-continuation-row", err))?,
})
}
async fn load_provider_batches(
pool: &PgPool,
provider_batch_ids: &[String],
options: &VisualizationOptions,
) -> Result<Vec<VisualizationProviderBatch>> {
if provider_batch_ids.is_empty() {
return Ok(Vec::new());
}
let rows = sqlx::query(
"SELECT provider, provider_batch_id, status, request_count, \
error_sexpr IS NOT NULL AS error_present, \
CASE WHEN $2 THEN error_sexpr ELSE NULL END AS error_sexpr, \
created_at::TEXT AS created_at, updated_at::TEXT AS updated_at, \
submitted_at::TEXT AS submitted_at, completed_at::TEXT AS completed_at, \
quiescent, quiesced_at::TEXT AS quiesced_at, \
CASE WHEN $3 THEN response ELSE NULL::jsonb END AS response \
FROM batch_provider_batches \
WHERE provider_batch_id = ANY($1::text[]) ORDER BY submitted_at, id",
)
.bind(provider_batch_ids)
.bind(options.include_errors || options.break_glass)
.bind(options.break_glass)
.fetch_all(pool)
.await
.map_err(|err| sqlx_error("visualizer-load-provider-batches", err))?;
rows.into_iter().map(provider_batch_from_row).collect()
}
fn provider_batch_from_row(row: sqlx::postgres::PgRow) -> Result<VisualizationProviderBatch> {
Ok(VisualizationProviderBatch {
provider: row
.try_get("provider")
.map_err(|err| sqlx_error("visualizer-provider-batch-row", err))?,
provider_batch_id: row
.try_get("provider_batch_id")
.map_err(|err| sqlx_error("visualizer-provider-batch-row", err))?,
status: row
.try_get("status")
.map_err(|err| sqlx_error("visualizer-provider-batch-row", err))?,
request_count: row
.try_get("request_count")
.map_err(|err| sqlx_error("visualizer-provider-batch-row", err))?,
error_present: row
.try_get("error_present")
.map_err(|err| sqlx_error("visualizer-provider-batch-row", err))?,
error_sexpr: row
.try_get("error_sexpr")
.map_err(|err| sqlx_error("visualizer-provider-batch-row", err))?,
created_at: row
.try_get("created_at")
.map_err(|err| sqlx_error("visualizer-provider-batch-row", err))?,
updated_at: row
.try_get("updated_at")
.map_err(|err| sqlx_error("visualizer-provider-batch-row", err))?,
submitted_at: row
.try_get("submitted_at")
.map_err(|err| sqlx_error("visualizer-provider-batch-row", err))?,
completed_at: row
.try_get("completed_at")
.map_err(|err| sqlx_error("visualizer-provider-batch-row", err))?,
quiescent: row
.try_get("quiescent")
.map_err(|err| sqlx_error("visualizer-provider-batch-row", err))?,
quiesced_at: row
.try_get("quiesced_at")
.map_err(|err| sqlx_error("visualizer-provider-batch-row", err))?,
response: row
.try_get("response")
.map_err(|err| sqlx_error("visualizer-provider-batch-row", err))?,
})
}
async fn load_fork_joins(
pool: &PgPool,
run_ids: &[String],
options: &VisualizationOptions,
) -> Result<Vec<VisualizationForkJoin>> {
let rows = sqlx::query(
"SELECT parent_run_id, lhs_run_id, rhs_run_id, status, \
error_sexpr IS NOT NULL AS error_present, \
CASE WHEN $2 THEN error_sexpr ELSE NULL END AS error_sexpr, \
created_at::TEXT AS created_at, updated_at::TEXT AS updated_at, \
quiescent, quiesced_at::TEXT AS quiesced_at \
FROM batch_fork_joins \
WHERE parent_run_id = ANY($1::text[]) \
OR lhs_run_id = ANY($1::text[]) \
OR rhs_run_id = ANY($1::text[]) \
ORDER BY created_at, id",
)
.bind(run_ids)
.bind(options.include_errors || options.break_glass)
.fetch_all(pool)
.await
.map_err(|err| sqlx_error("visualizer-load-fork-joins", err))?;
rows.into_iter().map(fork_join_from_row).collect()
}
fn fork_join_from_row(row: sqlx::postgres::PgRow) -> Result<VisualizationForkJoin> {
Ok(VisualizationForkJoin {
parent_run_id: row
.try_get("parent_run_id")
.map_err(|err| sqlx_error("visualizer-fork-join-row", err))?,
lhs_run_id: row
.try_get("lhs_run_id")
.map_err(|err| sqlx_error("visualizer-fork-join-row", err))?,
rhs_run_id: row
.try_get("rhs_run_id")
.map_err(|err| sqlx_error("visualizer-fork-join-row", err))?,
status: row
.try_get("status")
.map_err(|err| sqlx_error("visualizer-fork-join-row", err))?,
error_present: row
.try_get("error_present")
.map_err(|err| sqlx_error("visualizer-fork-join-row", err))?,
error_sexpr: row
.try_get("error_sexpr")
.map_err(|err| sqlx_error("visualizer-fork-join-row", err))?,
created_at: row
.try_get("created_at")
.map_err(|err| sqlx_error("visualizer-fork-join-row", err))?,
updated_at: row
.try_get("updated_at")
.map_err(|err| sqlx_error("visualizer-fork-join-row", err))?,
quiescent: row
.try_get("quiescent")
.map_err(|err| sqlx_error("visualizer-fork-join-row", err))?,
quiesced_at: row
.try_get("quiesced_at")
.map_err(|err| sqlx_error("visualizer-fork-join-row", err))?,
})
}
fn escape_json_for_html_script(json: &str) -> String {
json.replace('<', "\\u003c")
.replace('>', "\\u003e")
.replace('&', "\\u0026")
}
fn escape_html_text(text: &str) -> String {
text.replace('&', "&")
.replace('<', "<")
.replace('>', ">")
.replace('"', """)
}
const HTML_STYLE: &str = r#"<style>
:root {
color-scheme: light;
--bg: #f7f7f4;
--panel: #ffffff;
--text: #1f2328;
--muted: #667085;
--line: #d8d8d0;
--strong-line: #b8bbb0;
--normal: #425466;
--success: #1f7a4d;
--waiting: #8a5b00;
--blocked: #9a3412;
--retried: #6f42c1;
--failed: #b42318;
--focus: #0969da;
}
* { box-sizing: border-box; }
body {
margin: 0;
background: var(--bg);
color: var(--text);
font: 13px/1.42 ui-sans-serif, system-ui, -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif;
}
button, input, select {
font: inherit;
}
button {
border: 1px solid var(--strong-line);
background: #fff;
color: var(--text);
min-height: 28px;
padding: 3px 8px;
cursor: pointer;
}
button:hover { border-color: var(--focus); }
code, pre, .mono { font-family: ui-monospace, SFMono-Regular, Menlo, Consolas, monospace; }
pre {
overflow: auto;
margin: 6px 0 0;
padding: 8px;
background: #f1f3f5;
border: 1px solid var(--line);
max-height: 360px;
}
a { color: var(--focus); text-decoration: none; }
a:hover { text-decoration: underline; }
.topbar {
position: sticky;
top: 0;
z-index: 10;
background: rgba(247, 247, 244, 0.96);
border-bottom: 1px solid var(--line);
padding: 10px 14px;
}
.title-row {
display: flex;
align-items: baseline;
gap: 10px;
flex-wrap: wrap;
margin-bottom: 8px;
}
h1 {
font-size: 18px;
line-height: 1.2;
margin: 0;
font-weight: 700;
}
.subtle { color: var(--muted); }
.badge {
display: inline-flex;
align-items: center;
gap: 4px;
border: 1px solid var(--line);
background: #fff;
padding: 1px 6px;
min-height: 22px;
white-space: nowrap;
}
.badge.failed, .marker.failed { border-color: #f4b0aa; background: #fff1f0; color: var(--failed); }
.badge.blocked, .marker.blocked { border-color: #fdba74; background: #fff7ed; color: var(--blocked); }
.badge.waiting, .marker.waiting { border-color: #facc15; background: #fefce8; color: var(--waiting); }
.badge.retried, .marker.retried { border-color: #d8b4fe; background: #faf5ff; color: var(--retried); }
.badge.success, .marker.success { border-color: #9bd7b3; background: #effaf3; color: var(--success); }
.controls {
display: grid;
grid-template-columns: minmax(220px, 1fr) repeat(3, minmax(120px, 170px)) repeat(3, auto);
gap: 8px;
align-items: center;
}
.controls input, .controls select {
width: 100%;
min-height: 30px;
border: 1px solid var(--strong-line);
background: #fff;
padding: 4px 8px;
}
.overview, .section {
border-bottom: 1px solid var(--line);
padding: 12px 14px;
}
.overview-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(190px, 1fr));
gap: 8px 12px;
}
.kv {
display: grid;
grid-template-columns: 110px 1fr;
gap: 4px 8px;
}
.kv dt { color: var(--muted); }
.kv dd { margin: 0; min-width: 0; overflow-wrap: anywhere; }
.attention-list, .warning-list {
display: flex;
gap: 6px;
flex-wrap: wrap;
margin-top: 8px;
}
.main {
padding: 0 0 32px;
}
.lane {
border-bottom: 1px solid var(--line);
background: rgba(255,255,255,0.45);
}
.lane-header {
position: sticky;
top: 83px;
z-index: 5;
display: flex;
align-items: center;
gap: 8px;
flex-wrap: wrap;
min-height: 38px;
padding: 7px 14px;
background: rgba(250,250,247,0.96);
border-bottom: 1px solid var(--line);
}
.lane-events {
padding: 10px 14px 12px;
display: grid;
gap: 7px;
}
.event-node {
display: grid;
grid-template-columns: minmax(260px, 410px) minmax(0, 1fr);
gap: 8px;
align-items: start;
}
.marker {
text-align: left;
border: 1px solid var(--line);
background: #fff;
color: var(--normal);
min-height: 34px;
padding: 5px 8px;
}
.marker-main {
display: flex;
align-items: center;
gap: 6px;
flex-wrap: wrap;
}
.marker-label { overflow-wrap: anywhere; }
.marker-meta {
display: flex;
gap: 8px;
color: var(--muted);
font-size: 12px;
margin-top: 2px;
}
.event-detail {
display: none;
border-left: 3px solid var(--line);
padding: 5px 0 8px 10px;
min-width: 0;
}
.event-node.open .event-detail { display: block; }
.event-node.hidden { display: none; }
.event-node.highlight .marker {
outline: 2px solid var(--focus);
outline-offset: 1px;
}
.detail-actions {
display: flex;
gap: 8px;
flex-wrap: wrap;
margin-bottom: 6px;
}
.tables {
display: grid;
gap: 12px;
}
table {
width: 100%;
border-collapse: collapse;
background: #fff;
}
th, td {
border: 1px solid var(--line);
text-align: left;
padding: 5px 6px;
vertical-align: top;
}
th {
background: #f1f3ef;
color: #3a3f45;
font-weight: 650;
}
td { overflow-wrap: anywhere; }
.raw-json { width: 100%; min-height: 260px; }
.collapsed .lane-events { display: none; }
@media (max-width: 900px) {
.controls { grid-template-columns: 1fr 1fr; }
.event-node { grid-template-columns: 1fr; }
.lane-header { top: 126px; }
}
</style>
"#;
const HTML_BODY: &str = r#"<header class="topbar">
<div class="title-row">
<h1 id="page-title"></h1>
<span id="mode-badges"></span>
</div>
<div class="controls" role="search">
<input id="search" type="search" placeholder="Search events, ids, continuations, batches, JSON">
<select id="family-filter" aria-label="event family"></select>
<select id="run-filter" aria-label="run id"></select>
<select id="attention-filter" aria-label="attention state"></select>
<button id="expand-attention" type="button">Expand Attention</button>
<button id="collapse-all" type="button">Collapse All</button>
<button id="download-json" type="button">Download JSON</button>
</div>
</header>
<main class="main">
<section id="overview" class="overview"></section>
<section id="lanes" aria-label="workflow swimlanes"></section>
<section id="spans" class="section"></section>
<section id="state" class="section"></section>
<section id="raw" class="section"></section>
</main>
"#;
const HTML_SCRIPT: &str = r##"<script>
(() => {
"use strict";
const doc = JSON.parse(document.getElementById("visualization-data").textContent);
const eventById = new Map(doc.events.map((event) => [event.event_id, event]));
const nodeById = new Map();
const firstTs = Date.parse(doc.overview.first_created_at || "");
function text(value) {
return document.createTextNode(value == null ? "" : String(value));
}
function el(tag, attrs, ...children) {
const node = document.createElement(tag);
for (const [key, value] of Object.entries(attrs || {})) {
if (value == null || value === false) continue;
if (key === "class") node.className = value;
else if (key === "dataset") Object.assign(node.dataset, value);
else if (key === "text") node.textContent = value;
else node.setAttribute(key, value === true ? "" : String(value));
}
for (const child of children) {
if (child == null) continue;
node.appendChild(typeof child === "string" ? text(child) : child);
}
return node;
}
function timestampTitle(row) {
const keys = ["created_at", "updated_at", "submitted_at", "completed_at", "quiesced_at"];
return keys.filter((key) => row[key]).map((key) => `${key}: ${row[key]}`).join("\n");
}
function rel(ts) {
const parsed = Date.parse(ts || "");
if (!Number.isFinite(parsed) || !Number.isFinite(firstTs)) return "+?";
const delta = Math.max(0, parsed - firstTs);
if (delta < 1000) return `+${delta}ms`;
if (delta < 60000) return `+${(delta / 1000).toFixed(delta < 10000 ? 2 : 1)}s`;
if (delta < 3600000) return `+${(delta / 60000).toFixed(1)}m`;
return `+${(delta / 3600000).toFixed(1)}h`;
}
function badge(label, severity, title) {
return el("span", { class: `badge ${severity || ""}`, title }, label);
}
function linkButton(label, id) {
const button = el("button", { type: "button" }, label);
button.addEventListener("click", () => focusEvent(id));
return button;
}
function focusEvent(id) {
const node = nodeById.get(id);
if (!node) return;
node.classList.add("open", "highlight");
node.scrollIntoView({ block: "center", behavior: "smooth" });
setTimeout(() => node.classList.remove("highlight"), 1800);
history.replaceState(null, "", `#event-${id}`);
}
function renderOverview() {
document.getElementById("page-title").textContent = `Batch workflow ${doc.root_run_id}`;
const modes = document.getElementById("mode-badges");
modes.replaceChildren(
badge(`${doc.overview.total_events} events`, "normal"),
badge(`${doc.overview.run_count} runs`, "normal"),
badge(doc.provenance.break_glass ? "BREAK GLASS" : "redacted", doc.provenance.break_glass ? "failed" : "normal"),
badge(doc.provenance.include_errors ? "full errors" : "error presence only", doc.provenance.include_errors ? "blocked" : "normal"),
);
const overview = document.getElementById("overview");
overview.replaceChildren();
const grid = el("div", { class: "overview-grid" });
grid.appendChild(kvBlock("Provenance", [
["generated", doc.provenance.generated_at],
["root", doc.root_run_id],
["generator", `${doc.provenance.generator} ${doc.provenance.generator_version}`],
["max events", doc.provenance.max_events],
]));
grid.appendChild(kvBlock("Timeline", [
["first event", doc.overview.first_created_at || ""],
["last event", doc.overview.last_created_at || ""],
["ordering", "created_at, event_id; ordinals are per lane"],
]));
grid.appendChild(kvBlock("Event Families", Object.entries(doc.overview.event_family_counts)));
grid.appendChild(kvBlock("Current State", [
...Object.entries(doc.overview.workflow_status_counts).map(([k, v]) => [`workflow.${k}`, v]),
...Object.entries(doc.overview.continuation_status_counts).map(([k, v]) => [`continuation.${k}`, v]),
]));
overview.appendChild(grid);
if (doc.attention.length) {
overview.appendChild(el("h2", { text: "Attention" }));
const list = el("div", { class: "attention-list" });
for (const item of doc.attention) {
const label = `${item.severity}: ${item.label}`;
const target = item.event_id;
const node = target ? linkButton(label, target) : badge(label, item.severity);
list.appendChild(node);
}
overview.appendChild(list);
}
if (doc.warnings.length) {
overview.appendChild(el("h2", { text: "Warnings" }));
const list = el("div", { class: "warning-list" });
for (const warning of doc.warnings) {
const label = `${warning.code}: ${warning.message}`;
list.appendChild(warning.event_id ? linkButton(label, warning.event_id) : badge(label, "blocked"));
}
overview.appendChild(list);
}
}
function kvBlock(title, pairs) {
const block = el("div", {});
block.appendChild(el("h2", { text: title }));
const dl = el("dl", { class: "kv" });
for (const [key, value] of pairs) {
dl.appendChild(el("dt", {}, String(key)));
dl.appendChild(el("dd", {}, String(value)));
}
block.appendChild(dl);
return block;
}
function populateFilters() {
const family = document.getElementById("family-filter");
family.replaceChildren(el("option", { value: "", text: "All families" }));
for (const name of Object.keys(doc.overview.event_family_counts).sort()) {
family.appendChild(el("option", { value: name, text: name }));
}
const run = document.getElementById("run-filter");
run.replaceChildren(el("option", { value: "", text: "All runs" }));
for (const lane of doc.lanes) {
run.appendChild(el("option", { value: lane.run_id, text: lane.run_id }));
}
const attention = document.getElementById("attention-filter");
attention.replaceChildren(
el("option", { value: "", text: "All attention states" }),
el("option", { value: "failed", text: "Failed" }),
el("option", { value: "blocked", text: "Blocked" }),
el("option", { value: "retried", text: "Retried" }),
el("option", { value: "waiting", text: "Waiting" }),
);
for (const id of ["search", "family-filter", "run-filter", "attention-filter"]) {
document.getElementById(id).addEventListener("input", applyFilters);
}
document.addEventListener("keydown", (event) => {
if (event.key === "/" && event.target.tagName !== "INPUT") {
event.preventDefault();
document.getElementById("search").focus();
}
});
document.getElementById("expand-attention").addEventListener("click", expandAttention);
document.getElementById("collapse-all").addEventListener("click", collapseAll);
document.getElementById("download-json").addEventListener("click", downloadJson);
}
function renderLanes() {
const root = document.getElementById("lanes");
root.replaceChildren();
const workflowByRun = new Map(doc.workflows.map((workflow) => [workflow.run_id, workflow]));
for (const lane of doc.lanes) {
const workflow = workflowByRun.get(lane.run_id);
const section = el("section", { class: "lane", dataset: { runId: lane.run_id } });
const header = el("div", { class: "lane-header" });
const collapse = el("button", { type: "button" }, "Toggle");
collapse.addEventListener("click", () => section.classList.toggle("collapsed"));
header.appendChild(collapse);
header.appendChild(el("strong", { class: "mono", title: lane.first_created_at || "" }, lane.run_id));
if (lane.fork_name) header.appendChild(badge(`fork ${lane.fork_name}`, "normal"));
if (lane.parent_run_id) header.appendChild(badge(`parent ${lane.parent_run_id}`, "normal"));
if (workflow) header.appendChild(badge(`workflow ${workflow.status}`, statusSeverity(workflow.status), timestampTitle(workflow)));
for (const warning of lane.warnings) header.appendChild(badge(warning, "blocked"));
section.appendChild(header);
const events = el("div", { class: "lane-events" });
for (const eventId of lane.event_ids) {
const event = eventById.get(eventId);
if (event) events.appendChild(renderEvent(event));
}
section.appendChild(events);
root.appendChild(section);
}
}
function renderEvent(event) {
const node = el("article", {
class: "event-node",
id: `event-${event.event_id}`,
dataset: {
eventId: event.event_id,
runId: event.run_id,
family: event.event_family,
attention: event.attention.join(" "),
search: eventSearchText(event),
},
});
nodeById.set(event.event_id, node);
const marker = el("button", {
type: "button",
class: `marker ${event.severity}`,
title: event.created_at,
});
marker.appendChild(el("div", { class: "marker-main" },
el("span", { class: "mono" }, `#${event.event_ordinal}`),
el("strong", {}, event.event_type),
el("span", { class: "marker-label" }, event.label),
));
marker.appendChild(el("div", { class: "marker-meta" },
el("span", {}, rel(event.created_at)),
el("span", { class: "mono" }, shortId(event.event_id)),
));
marker.addEventListener("click", () => node.classList.toggle("open"));
node.appendChild(marker);
node.appendChild(renderEventDetail(event));
return node;
}
function renderEventDetail(event) {
const detail = el("div", { class: "event-detail" });
const actions = el("div", { class: "detail-actions" });
if (event.caused_by_event_id) actions.appendChild(linkButton(`caused by ${shortId(event.caused_by_event_id)}`, event.caused_by_event_id));
for (const caused of event.caused_event_ids) actions.appendChild(linkButton(`caused ${shortId(caused)}`, caused));
detail.appendChild(actions);
const pairs = [
["event_id", event.event_id],
["run_id", event.run_id],
["created_at", event.created_at],
["caused_by", JSON.stringify(event.caused_by)],
["continuation_id", event.continuation_id || ""],
["version", event.event_version],
];
for (const field of event.summary) pairs.push([field.key, field.value]);
detail.appendChild(kvBlock("Event Details", pairs));
detail.appendChild(el("details", {},
el("summary", {}, "Raw event JSON"),
el("pre", {}, JSON.stringify(event.event, null, 2)),
));
return detail;
}
function eventSearchText(event) {
return [
event.event_id,
event.run_id,
event.root_run_id,
event.event_type,
event.event_family,
event.continuation_id || "",
event.label,
event.summary.map((field) => `${field.key}:${field.value}`).join(" "),
JSON.stringify(event.event),
].join(" ").toLowerCase();
}
function applyFilters() {
const query = document.getElementById("search").value.trim().toLowerCase();
const family = document.getElementById("family-filter").value;
const run = document.getElementById("run-filter").value;
const attention = document.getElementById("attention-filter").value;
for (const node of nodeById.values()) {
const matches =
(!query || node.dataset.search.includes(query)) &&
(!family || node.dataset.family === family) &&
(!run || node.dataset.runId === run) &&
(!attention || node.dataset.attention.split(" ").includes(attention));
node.classList.toggle("hidden", !matches);
}
}
function expandAttention() {
const ids = new Set(doc.attention.map((item) => item.event_id).filter(Boolean));
for (const id of ids) {
const node = nodeById.get(id);
if (node) node.classList.add("open");
}
const first = ids.values().next().value;
if (first) focusEvent(first);
}
function collapseAll() {
for (const node of nodeById.values()) node.classList.remove("open");
}
function renderSpans() {
const root = document.getElementById("spans");
root.replaceChildren(el("h2", { text: "Derived Spans" }));
if (!doc.spans.length) {
root.appendChild(el("p", { class: "subtle" }, "No known event pairs produced spans."));
return;
}
const table = tableFor(["kind", "run", "start", "end", "state", "label"]);
const tbody = table.querySelector("tbody");
for (const span of doc.spans) {
const tr = el("tr", {});
tr.appendChild(el("td", {}, span.kind));
tr.appendChild(el("td", { class: "mono" }, span.run_id));
tr.appendChild(el("td", {}, linkButton(shortId(span.start_event_id), span.start_event_id)));
const end = el("td", {});
if (span.end_event_id) end.appendChild(linkButton(shortId(span.end_event_id), span.end_event_id));
tr.appendChild(end);
tr.appendChild(el("td", {}, badge(span.open ? "open" : "closed", span.open ? "waiting" : "success")));
tr.appendChild(el("td", {}, span.label));
tbody.appendChild(tr);
}
root.appendChild(table);
}
function renderState() {
const root = document.getElementById("state");
root.replaceChildren(el("h2", { text: "Current State" }));
const tables = el("div", { class: "tables" });
tables.appendChild(stateTable("Workflows", doc.workflows, ["run_id", "status", "parent_run_id", "fork_name", "error_present", "quiescent"]));
tables.appendChild(stateTable("Continuations", doc.continuations, ["continuation_id", "workflow_run_id", "kind", "status", "provider", "provider_batch_id", "attempt", "error_present", "quiescent"]));
tables.appendChild(stateTable("Provider Batches", doc.provider_batches, ["provider_batch_id", "provider", "status", "request_count", "error_present", "quiescent"]));
tables.appendChild(stateTable("Fork Joins", doc.fork_joins, ["parent_run_id", "lhs_run_id", "rhs_run_id", "status", "error_present", "quiescent"]));
root.appendChild(tables);
}
function stateTable(title, rows, columns) {
const wrap = el("div", {});
wrap.appendChild(el("h3", { text: title }));
if (!rows.length) {
wrap.appendChild(el("p", { class: "subtle" }, "No rows."));
return wrap;
}
const table = tableFor(columns);
const tbody = table.querySelector("tbody");
for (const row of rows) {
const tr = el("tr", { title: timestampTitle(row) });
for (const column of columns) tr.appendChild(el("td", {}, row[column] == null ? "" : String(row[column])));
tbody.appendChild(tr);
}
wrap.appendChild(table);
return wrap;
}
function tableFor(columns) {
const table = el("table", {});
table.appendChild(el("thead", {}, el("tr", {}, ...columns.map((column) => el("th", {}, column)))));
table.appendChild(el("tbody", {}));
return table;
}
function renderRaw() {
const root = document.getElementById("raw");
root.replaceChildren(
el("h2", { text: "Embedded Document JSON" }),
el("button", { id: "copy-json", type: "button" }, "Copy JSON"),
el("details", {},
el("summary", {}, "Show embedded JSON"),
el("pre", { class: "raw-json" }, JSON.stringify(doc, null, 2)),
),
);
document.getElementById("copy-json").addEventListener("click", () => {
navigator.clipboard.writeText(JSON.stringify(doc, null, 2));
});
}
function downloadJson() {
const blob = new Blob([JSON.stringify(doc, null, 2)], { type: "application/json" });
const url = URL.createObjectURL(blob);
const anchor = el("a", { href: url, download: `${doc.root_run_id}-batch-visualization.json` });
document.body.appendChild(anchor);
anchor.click();
anchor.remove();
URL.revokeObjectURL(url);
}
function shortId(id) {
return String(id).slice(0, 8);
}
function statusSeverity(status) {
if (status === "failed") return "failed";
if (status === "blocked" || status === "blocked_human" || status === "blocked_openai") return "blocked";
if (["pending", "submitted", "waiting", "waiting_anthropic", "waiting_fork_join", "in_progress", "canceling"].includes(status)) return "waiting";
if (["halted", "succeeded", "resumed", "joined", "ended"].includes(status)) return "success";
return "normal";
}
renderOverview();
populateFilters();
renderLanes();
renderSpans();
renderState();
renderRaw();
if (location.hash.startsWith("#event-")) focusEvent(location.hash.slice("#event-".length));
})();
</script>
"##;
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn id(n: u128) -> Uuid {
Uuid::from_u128(n)
}
fn raw_event(
event_id: Uuid,
run_id: &str,
ordinal: i64,
event_type: &str,
caused_by: CausalRef,
event: Value,
created_at: &str,
) -> RawEvent {
RawEvent {
event_id,
root_run_id: "root".to_string(),
run_id: run_id.to_string(),
parent_run_id: None,
fork_name: None,
event_ordinal: ordinal,
caused_by,
event_type: event_type.to_string(),
event_version: 1,
continuation_id: event
.get("continuation_id")
.and_then(Value::as_str)
.map(ToString::to_string),
event,
created_at: created_at.to_string(),
}
}
fn workflow(
run_id: &str,
status: &str,
parent: Option<&str>,
fork: Option<&str>,
) -> VisualizationWorkflow {
VisualizationWorkflow {
run_id: run_id.to_string(),
root_run_id: "root".to_string(),
status: status.to_string(),
parent_run_id: parent.map(ToString::to_string),
fork_name: fork.map(ToString::to_string),
error_present: false,
error_sexpr: None,
next_event_ordinal: 1,
created_at: "2026-06-08 00:00:00+00".to_string(),
updated_at: "2026-06-08 00:00:00+00".to_string(),
quiescent: false,
quiesced_at: None,
workflow: None,
}
}
#[test]
fn projection_orders_parent_before_children_and_builds_reverse_causal_links() {
let e1 = id(1);
let e2 = id(2);
let e3 = id(3);
let mut child_event = raw_event(
e3,
"root:lhs",
0,
"workflow.enqueued",
CausalRef::EventId { event_id: e1 },
json!({"kind": "fork_branch"}),
"2026-06-08 00:00:02+00",
);
child_event.parent_run_id = Some("root".to_string());
child_event.fork_name = Some("lhs".to_string());
let document = project_visualization(
"root".to_string(),
"2026-06-08 00:00:03+00".to_string(),
VisualizationOptions::default(),
vec![
raw_event(
e1,
"root",
0,
"fork_join.started",
CausalRef::RunId {
run_id: "root".to_string(),
},
json!({"branch_run_id": {"lhs": "root:lhs"}}),
"2026-06-08 00:00:01+00",
),
raw_event(
e2,
"root",
1,
"workflow.halted",
CausalRef::EventId { event_id: e1 },
json!({"env_key_count": 3}),
"2026-06-08 00:00:03+00",
),
child_event,
],
vec![
workflow("root", "halted", None, None),
workflow("root:lhs", "halted", Some("root"), Some("lhs")),
],
Vec::new(),
Vec::new(),
Vec::new(),
);
assert_eq!(
document.lanes,
vec![
VisualizationLane {
run_id: "root".to_string(),
parent_run_id: None,
fork_name: None,
first_created_at: Some("2026-06-08 00:00:01+00".to_string()),
event_ids: vec![e1.to_string(), e2.to_string()],
current_status: Some("halted".to_string()),
warnings: Vec::new(),
},
VisualizationLane {
run_id: "root:lhs".to_string(),
parent_run_id: Some("root".to_string()),
fork_name: Some("lhs".to_string()),
first_created_at: Some("2026-06-08 00:00:02+00".to_string()),
event_ids: vec![e3.to_string()],
current_status: Some("halted".to_string()),
warnings: Vec::new(),
},
]
);
let fork = document
.events
.iter()
.find(|event| event.event_id == e1.to_string())
.expect("fork event");
assert_eq!(fork.caused_event_ids, vec![e3.to_string(), e2.to_string()]);
}
#[test]
fn projection_surfaces_integrity_warnings_and_attention() {
let e1 = id(11);
let missing_cause = id(12);
let document = project_visualization(
"root".to_string(),
"2026-06-08 00:00:03+00".to_string(),
VisualizationOptions::default(),
vec![raw_event(
e1,
"root",
0,
"anthropic.failed",
CausalRef::EventId {
event_id: missing_cause,
},
json!({
"continuation_id": "c1",
"provider_batch_id": "pb1",
"attempt": 1
}),
"2026-06-08 00:00:01+00",
)],
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
);
assert_eq!(
document.warnings,
vec![
VisualizationWarning {
code: "unknown-causal-event".to_string(),
message: format!("event {e1} references missing cause {missing_cause}"),
event_id: Some(e1.to_string()),
run_id: Some("root".to_string()),
continuation_id: None,
provider_batch_id: None,
},
VisualizationWarning {
code: "missing-continuation-row".to_string(),
message: format!("event {e1} references missing continuation c1"),
event_id: Some(e1.to_string()),
run_id: Some("root".to_string()),
continuation_id: Some("c1".to_string()),
provider_batch_id: None,
},
VisualizationWarning {
code: "missing-provider-batch-row".to_string(),
message: format!("event {e1} references missing provider batch pb1"),
event_id: Some(e1.to_string()),
run_id: Some("root".to_string()),
continuation_id: None,
provider_batch_id: Some("pb1".to_string()),
},
VisualizationWarning {
code: "missing-workflow-row".to_string(),
message: "run root has events but no current workflow row".to_string(),
event_id: None,
run_id: Some("root".to_string()),
continuation_id: None,
provider_batch_id: None,
},
]
);
assert_eq!(
document.attention,
vec![VisualizationAttention {
kind: "event".to_string(),
severity: "failed".to_string(),
label: "root #0 anthropic.failed".to_string(),
event_id: Some(e1.to_string()),
run_id: Some("root".to_string()),
continuation_id: Some("c1".to_string()),
provider_batch_id: Some("pb1".to_string()),
}]
);
}
#[test]
fn render_escapes_json_embedded_in_script_tag() {
let e1 = id(21);
let document = project_visualization(
"root".to_string(),
"2026-06-08 00:00:03+00".to_string(),
VisualizationOptions::default(),
vec![raw_event(
e1,
"root",
0,
"custom.event",
CausalRef::RunId {
run_id: "root".to_string(),
},
json!({"value": "</script><img src=x>"}),
"2026-06-08 00:00:01+00",
)],
vec![workflow("root", "runnable", None, None)],
Vec::new(),
Vec::new(),
Vec::new(),
);
let html = render_visualization_html(&document, HtmlOptions::default()).expect("render");
assert!(!html.contains("</script><img src=x>"));
assert!(html.contains("\\u003c/script\\u003e\\u003cimg src=x\\u003e"));
}
#[test]
fn derives_known_open_and_closed_spans() {
let e1 = id(31);
let e2 = id(32);
let e3 = id(33);
let document = project_visualization(
"root".to_string(),
"2026-06-08 00:00:03+00".to_string(),
VisualizationOptions::default(),
vec![
raw_event(
e1,
"root",
0,
"local_call.started",
CausalRef::RunId {
run_id: "root".to_string(),
},
json!({"function": "entry"}),
"2026-06-08 00:00:01+00",
),
raw_event(
e2,
"root",
1,
"local_call.completed",
CausalRef::EventId { event_id: e1 },
json!({"function": "entry"}),
"2026-06-08 00:00:02+00",
),
raw_event(
e3,
"root",
2,
"anthropic.suspended",
CausalRef::EventId { event_id: e2 },
json!({"continuation_id": "c1"}),
"2026-06-08 00:00:03+00",
),
],
vec![workflow("root", "waiting_anthropic", None, None)],
Vec::new(),
Vec::new(),
Vec::new(),
);
assert_eq!(
document.spans,
vec![
VisualizationSpan {
kind: "local_call".to_string(),
run_id: "root".to_string(),
start_event_id: e1.to_string(),
end_event_id: Some(e2.to_string()),
label: "local_call.started to local_call.completed".to_string(),
open: false,
},
VisualizationSpan {
kind: "anthropic:c1".to_string(),
run_id: "root".to_string(),
start_event_id: e3.to_string(),
end_event_id: None,
label: "anthropic.suspended is open".to_string(),
open: true,
},
]
);
}
}