use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use arrow::array::{
Array, ArrayBuilder, ListBuilder, RecordBatch, StringArray, StringBuilder,
StructBuilder, TimestampMicrosecondArray,
};
use arrow::datatypes::{DataType, FieldRef, Fields, Schema as ArrowSchema, TimeUnit};
use chrono::{DateTime, TimeZone, Utc};
use futures::TryStreamExt;
use iceberg::Catalog;
use iceberg::arrow::schema_to_arrow_schema;
use uuid::Uuid;
use super::iceberg::{IcebergWarehouse, TABLE_FUNNEL_EVENTS, append_batch, ensure_table_schema};
use crate::funnel::event::{
CommitRef, Event, ItemKind, NodeStatus, PlanStatus, RunOutcome, TriageDecision,
};
use crate::funnel::ids::{IdeaId, NodeId, PlanId, RunId};
const COL_EVENT_ID: usize = 0;
const COL_TS_MICROS: usize = 1;
const COL_KIND: usize = 2;
const COL_IDEA_ID: usize = 3;
const COL_PLAN_ID: usize = 4;
const COL_NODE_ID: usize = 5;
const COL_RUN_ID: usize = 6;
const COL_FROM_NODE: usize = 7;
const COL_TO_NODE: usize = 8;
const COL_SOURCE: usize = 9;
const COL_TEXT: usize = 10;
const COL_REFS: usize = 11;
const COL_DECISION: usize = 12;
const COL_NODE_STATUS: usize = 13;
const COL_PLAN_STATUS: usize = 14;
const COL_WHY: usize = 15;
const COL_SUMMARY: usize = 16;
const COL_PLANNER: usize = 17;
const COL_NODE_KIND: usize = 18;
const COL_TARGETS: usize = 19;
const COL_PROMPT_EXCERPT: usize = 20;
const COL_PARAMS_JSON: usize = 21;
const COL_RAN_BY: usize = 22;
const COL_OUTCOME: usize = 23;
const COL_LOG_REF: usize = 24;
const COL_PRODUCED_COMMITS: usize = 25;
const COL_PRODUCED_TEST_RUNS: usize = 26;
const COL_ITEM_KIND: usize = 27;
const KIND_IDEA_SUBMITTED: &str = "IdeaSubmitted";
const KIND_IDEA_TRIAGED: &str = "IdeaTriaged";
const KIND_PLAN_CREATED: &str = "PlanCreated";
const KIND_NODE_ADDED: &str = "NodeAdded";
const KIND_EDGE_ADDED: &str = "EdgeAdded";
const KIND_NODE_STATUS_CHANGED: &str = "NodeStatusChanged";
const KIND_RUN_RECORDED: &str = "RunRecorded";
const KIND_PLAN_STATUS_CHANGED: &str = "PlanStatusChanged";
pub async fn append_event(wh: &IcebergWarehouse, event: &Event) -> Result<()> {
let ident = wh.table_ident(TABLE_FUNNEL_EVENTS);
let table = wh.catalog().load_table(&ident).await?;
let table = ensure_table_schema(
wh.catalog(),
&ident,
table,
&super::iceberg_schema::funnel_events()?,
)
.await?;
let batch = build_batch(&table, std::slice::from_ref(event))?;
append_batch(wh.catalog(), table, batch).await
}
pub async fn load_all_events(wh: &IcebergWarehouse) -> Result<Vec<Event>> {
let table = wh.catalog().load_table(&wh.table_ident(TABLE_FUNNEL_EVENTS)).await?;
let scan = table.scan().build()?;
let stream = scan.to_arrow().await?;
let batches: Vec<RecordBatch> = stream.try_collect().await?;
let mut events: Vec<Event> = Vec::new();
for batch in &batches {
let chunk = parse_batch(batch)?;
events.extend(chunk);
}
events.sort_by_key(|e| e.ts());
Ok(events)
}
fn build_batch(
table: &iceberg::table::Table,
events: &[Event],
) -> Result<RecordBatch> {
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let refs_elem = list_element_field(&schema, COL_REFS)?;
let targets_elem = list_element_field(&schema, COL_TARGETS)?;
let test_runs_elem = list_element_field(&schema, COL_PRODUCED_TEST_RUNS)?;
let commits_elem = list_element_field(&schema, COL_PRODUCED_COMMITS)?;
let commit_struct_fields: Fields = match commits_elem.data_type() {
DataType::Struct(fs) => fs.clone(),
other => return Err(anyhow!("produced_commits element not Struct: {other:?}")),
};
let mut event_ids: Vec<String> = Vec::with_capacity(events.len());
let mut ts_vals: Vec<i64> = Vec::with_capacity(events.len());
let mut kinds: Vec<String> = Vec::with_capacity(events.len());
let mut idea_ids: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut plan_ids: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut node_ids: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut run_ids: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut from_nodes: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut to_nodes: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut sources: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut texts: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut decisions: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut node_statuses: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut plan_statuses: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut whys: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut summaries: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut planners: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut node_kinds: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut prompt_excerpts: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut params_jsons: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut ran_bys: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut outcomes: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut log_refs: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut item_kinds: Vec<Option<String>> = Vec::with_capacity(events.len());
let mut refs_b: ListBuilder<StringBuilder> =
ListBuilder::new(StringBuilder::new()).with_field(refs_elem);
let mut targets_b: ListBuilder<StringBuilder> =
ListBuilder::new(StringBuilder::new()).with_field(targets_elem);
let mut test_runs_b: ListBuilder<StringBuilder> =
ListBuilder::new(StringBuilder::new()).with_field(test_runs_elem);
let commit_struct_builder = StructBuilder::new(
commit_struct_fields.clone(),
vec![
Box::new(StringBuilder::new()) as Box<dyn ArrayBuilder>,
Box::new(StringBuilder::new()) as Box<dyn ArrayBuilder>,
],
);
let mut commits_b: ListBuilder<StructBuilder> =
ListBuilder::new(commit_struct_builder).with_field(commits_elem);
for ev in events {
event_ids.push(Uuid::new_v4().to_string());
ts_vals.push(ev.ts().timestamp_micros());
kinds.push(event_kind_str(ev).to_string());
let mut idea_id = None;
let mut plan_id = None;
let mut node_id = None;
let mut run_id = None;
let mut from_node = None;
let mut to_node = None;
let mut source = None;
let mut text = None;
let mut decision = None;
let mut node_status = None;
let mut plan_status = None;
let mut why = None;
let mut summary = None;
let mut planner = None;
let mut node_kind = None;
let mut prompt_excerpt = None;
let mut params_json = None;
let mut ran_by = None;
let mut outcome = None;
let mut log_ref = None;
let mut item_kind: Option<String> = None;
let mut refs_list: Option<&Vec<String>> = None;
let mut targets_list: Option<&Vec<String>> = None;
let mut test_runs_list: Option<&Vec<String>> = None;
let mut commits_list: Option<&Vec<CommitRef>> = None;
match ev {
Event::IdeaSubmitted { id, source: s, text: t, refs, item_kind: ik, .. } => {
idea_id = Some(id.as_str().to_string());
source = Some(s.clone());
text = Some(t.clone());
refs_list = Some(refs);
item_kind = Some(ik.as_str().to_string());
}
Event::IdeaTriaged { idea_id: iid, decision: d, why: w, .. } => {
idea_id = Some(iid.as_str().to_string());
decision = Some(triage_str(*d).to_string());
why = w.clone();
}
Event::PlanCreated { id, idea_id: iid, summary: s, planner: p, .. } => {
plan_id = Some(id.as_str().to_string());
idea_id = Some(iid.as_str().to_string());
summary = Some(s.clone());
planner = Some(p.clone());
}
Event::NodeAdded { plan_id: pid, node_id: nid, kind, params, targets, prompt_excerpt: pe, .. } => {
plan_id = Some(pid.as_str().to_string());
node_id = Some(nid.as_str().to_string());
node_kind = Some(kind.clone());
prompt_excerpt = pe.clone();
targets_list = Some(targets);
if !params.is_empty() {
params_json = Some(serde_json::to_string(params).unwrap_or_default());
}
}
Event::EdgeAdded { plan_id: pid, from_node: f, to_node: t, .. } => {
plan_id = Some(pid.as_str().to_string());
from_node = Some(f.as_str().to_string());
to_node = Some(t.as_str().to_string());
}
Event::NodeStatusChanged { plan_id: pid, node_id: nid, status, why: w, .. } => {
plan_id = Some(pid.as_str().to_string());
node_id = Some(nid.as_str().to_string());
node_status = Some(node_status_str(*status).to_string());
why = w.clone();
}
Event::RunRecorded { plan_id: pid, node_id: nid, run_id: rid, ran_by: rb, outcome: o, log_ref: lr, produced_commits, produced_test_runs, .. } => {
plan_id = Some(pid.as_str().to_string());
node_id = Some(nid.as_str().to_string());
run_id = Some(rid.as_str().to_string());
ran_by = Some(rb.clone());
outcome = Some(run_outcome_str(*o).to_string());
log_ref = lr.clone();
commits_list = Some(produced_commits);
test_runs_list = Some(produced_test_runs);
}
Event::PlanStatusChanged { plan_id: pid, status, why: w, .. } => {
plan_id = Some(pid.as_str().to_string());
plan_status = Some(plan_status_str(*status).to_string());
why = w.clone();
}
}
idea_ids.push(idea_id);
plan_ids.push(plan_id);
node_ids.push(node_id);
run_ids.push(run_id);
from_nodes.push(from_node);
to_nodes.push(to_node);
sources.push(source);
texts.push(text);
decisions.push(decision);
node_statuses.push(node_status);
plan_statuses.push(plan_status);
whys.push(why);
summaries.push(summary);
planners.push(planner);
node_kinds.push(node_kind);
prompt_excerpts.push(prompt_excerpt);
params_jsons.push(params_json);
ran_bys.push(ran_by);
outcomes.push(outcome);
log_refs.push(log_ref);
item_kinds.push(item_kind);
append_string_list(&mut refs_b, refs_list);
append_string_list(&mut targets_b, targets_list);
append_string_list(&mut test_runs_b, test_runs_list);
append_commit_list(&mut commits_b, commits_list);
}
let n_fields = schema.fields().len();
let mut cols: Vec<Arc<dyn Array>> =
vec![Arc::new(StringArray::from(Vec::<String>::new())); n_fields];
cols[COL_EVENT_ID] = Arc::new(StringArray::from(event_ids));
cols[COL_TS_MICROS] = Arc::new(
TimestampMicrosecondArray::from(ts_vals).with_timezone("+00:00"),
);
cols[COL_KIND] = Arc::new(StringArray::from(kinds));
cols[COL_IDEA_ID] = Arc::new(StringArray::from(idea_ids));
cols[COL_PLAN_ID] = Arc::new(StringArray::from(plan_ids));
cols[COL_NODE_ID] = Arc::new(StringArray::from(node_ids));
cols[COL_RUN_ID] = Arc::new(StringArray::from(run_ids));
cols[COL_FROM_NODE] = Arc::new(StringArray::from(from_nodes));
cols[COL_TO_NODE] = Arc::new(StringArray::from(to_nodes));
cols[COL_SOURCE] = Arc::new(StringArray::from(sources));
cols[COL_TEXT] = Arc::new(StringArray::from(texts));
cols[COL_REFS] = Arc::new(refs_b.finish());
cols[COL_DECISION] = Arc::new(StringArray::from(decisions));
cols[COL_NODE_STATUS] = Arc::new(StringArray::from(node_statuses));
cols[COL_PLAN_STATUS] = Arc::new(StringArray::from(plan_statuses));
cols[COL_WHY] = Arc::new(StringArray::from(whys));
cols[COL_SUMMARY] = Arc::new(StringArray::from(summaries));
cols[COL_PLANNER] = Arc::new(StringArray::from(planners));
cols[COL_NODE_KIND] = Arc::new(StringArray::from(node_kinds));
cols[COL_TARGETS] = Arc::new(targets_b.finish());
cols[COL_PROMPT_EXCERPT] = Arc::new(StringArray::from(prompt_excerpts));
cols[COL_PARAMS_JSON] = Arc::new(StringArray::from(params_jsons));
cols[COL_RAN_BY] = Arc::new(StringArray::from(ran_bys));
cols[COL_OUTCOME] = Arc::new(StringArray::from(outcomes));
cols[COL_LOG_REF] = Arc::new(StringArray::from(log_refs));
cols[COL_PRODUCED_COMMITS] = Arc::new(commits_b.finish());
cols[COL_PRODUCED_TEST_RUNS] = Arc::new(test_runs_b.finish());
if n_fields > COL_ITEM_KIND {
cols[COL_ITEM_KIND] = Arc::new(StringArray::from(item_kinds));
}
let _ = TimeUnit::Microsecond; Ok(RecordBatch::try_new(schema, cols)?)
}
fn append_string_list(b: &mut ListBuilder<StringBuilder>, list: Option<&Vec<String>>) {
match list {
None => b.append(false),
Some(v) => {
for s in v {
b.values().append_value(s);
}
b.append(true);
}
}
}
fn list_element_field(schema: &ArrowSchema, idx: usize) -> Result<FieldRef> {
let field = schema.field(idx);
match field.data_type() {
DataType::List(elem) | DataType::LargeList(elem) => Ok(elem.clone()),
other => Err(anyhow!(
"column `{}` (idx {idx}) expected List, got {other:?}",
field.name()
)),
}
}
fn append_commit_list(b: &mut ListBuilder<StructBuilder>, list: Option<&Vec<CommitRef>>) {
match list {
None => b.append(false),
Some(v) => {
let struct_b: &mut StructBuilder = b.values();
for c in v {
struct_b
.field_builder::<StringBuilder>(0)
.expect("repo builder")
.append_value(&c.repo);
struct_b
.field_builder::<StringBuilder>(1)
.expect("sha builder")
.append_value(&c.sha);
struct_b.append(true);
}
b.append(true);
}
}
}
fn parse_batch(batch: &RecordBatch) -> Result<Vec<Event>> {
let kinds = string_col(batch, COL_KIND, "kind")?;
let ts_arr = batch
.column(COL_TS_MICROS)
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.ok_or_else(|| anyhow!("ts_micros column not Timestamp(us)"))?;
let idea_ids = string_opt_col(batch, COL_IDEA_ID);
let plan_ids = string_opt_col(batch, COL_PLAN_ID);
let node_ids = string_opt_col(batch, COL_NODE_ID);
let run_ids = string_opt_col(batch, COL_RUN_ID);
let from_nodes = string_opt_col(batch, COL_FROM_NODE);
let to_nodes = string_opt_col(batch, COL_TO_NODE);
let sources = string_opt_col(batch, COL_SOURCE);
let texts = string_opt_col(batch, COL_TEXT);
let decisions = string_opt_col(batch, COL_DECISION);
let node_statuses = string_opt_col(batch, COL_NODE_STATUS);
let plan_statuses = string_opt_col(batch, COL_PLAN_STATUS);
let whys = string_opt_col(batch, COL_WHY);
let summaries = string_opt_col(batch, COL_SUMMARY);
let planners = string_opt_col(batch, COL_PLANNER);
let node_kinds = string_opt_col(batch, COL_NODE_KIND);
let prompt_excerpts = string_opt_col(batch, COL_PROMPT_EXCERPT);
let params_jsons = string_opt_col(batch, COL_PARAMS_JSON);
let ran_bys = string_opt_col(batch, COL_RAN_BY);
let outcomes = string_opt_col(batch, COL_OUTCOME);
let log_refs = string_opt_col(batch, COL_LOG_REF);
let item_kinds = string_opt_col_if_present(batch, COL_ITEM_KIND);
let refs_lists = string_list_col(batch, COL_REFS)?;
let targets_lists = string_list_col(batch, COL_TARGETS)?;
let test_runs_lists = string_list_col(batch, COL_PRODUCED_TEST_RUNS)?;
let commits_lists = commit_list_col(batch, COL_PRODUCED_COMMITS)?;
let mut out: Vec<Event> = Vec::with_capacity(batch.num_rows());
for i in 0..batch.num_rows() {
let ts = micros_to_dt(ts_arr.value(i));
let kind = kinds.value(i);
let ev = match kind {
KIND_IDEA_SUBMITTED => Event::IdeaSubmitted {
id: IdeaId::new(idea_ids[i].clone().context("IdeaSubmitted needs idea_id")?),
source: sources[i].clone().unwrap_or_default(),
text: texts[i].clone().unwrap_or_default(),
refs: refs_lists[i].clone().unwrap_or_default(),
item_kind: item_kinds
.get(i)
.and_then(|o| o.as_deref())
.map(ItemKind::parse)
.unwrap_or_default(),
ts,
},
KIND_IDEA_TRIAGED => Event::IdeaTriaged {
idea_id: IdeaId::new(idea_ids[i].clone().context("IdeaTriaged needs idea_id")?),
decision: parse_triage(decisions[i].as_deref().unwrap_or(""))?,
why: whys[i].clone(),
ts,
},
KIND_PLAN_CREATED => Event::PlanCreated {
id: PlanId::new(plan_ids[i].clone().context("PlanCreated needs plan_id")?),
idea_id: IdeaId::new(idea_ids[i].clone().context("PlanCreated needs idea_id")?),
summary: summaries[i].clone().unwrap_or_default(),
planner: planners[i].clone().unwrap_or_default(),
ts,
},
KIND_NODE_ADDED => Event::NodeAdded {
plan_id: PlanId::new(plan_ids[i].clone().context("NodeAdded needs plan_id")?),
node_id: NodeId::new(node_ids[i].clone().context("NodeAdded needs node_id")?),
kind: node_kinds[i].clone().unwrap_or_default(),
params: match params_jsons[i].as_deref() {
Some(s) if !s.is_empty() => serde_json::from_str(s).unwrap_or_default(),
_ => serde_json::Map::new(),
},
targets: targets_lists[i].clone().unwrap_or_default(),
prompt_excerpt: prompt_excerpts[i].clone(),
ts,
},
KIND_EDGE_ADDED => Event::EdgeAdded {
plan_id: PlanId::new(plan_ids[i].clone().context("EdgeAdded needs plan_id")?),
from_node: NodeId::new(from_nodes[i].clone().context("EdgeAdded needs from_node")?),
to_node: NodeId::new(to_nodes[i].clone().context("EdgeAdded needs to_node")?),
ts,
},
KIND_NODE_STATUS_CHANGED => Event::NodeStatusChanged {
plan_id: PlanId::new(plan_ids[i].clone().context("NodeStatusChanged needs plan_id")?),
node_id: NodeId::new(node_ids[i].clone().context("NodeStatusChanged needs node_id")?),
status: parse_node_status(node_statuses[i].as_deref().unwrap_or(""))?,
why: whys[i].clone(),
ts,
},
KIND_RUN_RECORDED => Event::RunRecorded {
plan_id: PlanId::new(plan_ids[i].clone().context("RunRecorded needs plan_id")?),
node_id: NodeId::new(node_ids[i].clone().context("RunRecorded needs node_id")?),
run_id: RunId::new(run_ids[i].clone().context("RunRecorded needs run_id")?),
ran_by: ran_bys[i].clone().unwrap_or_default(),
outcome: parse_run_outcome(outcomes[i].as_deref().unwrap_or(""))?,
log_ref: log_refs[i].clone(),
produced_commits: commits_lists[i].clone().unwrap_or_default(),
produced_test_runs: test_runs_lists[i].clone().unwrap_or_default(),
ts,
},
KIND_PLAN_STATUS_CHANGED => Event::PlanStatusChanged {
plan_id: PlanId::new(plan_ids[i].clone().context("PlanStatusChanged needs plan_id")?),
status: parse_plan_status(plan_statuses[i].as_deref().unwrap_or(""))?,
why: whys[i].clone(),
ts,
},
other => return Err(anyhow!("unknown funnel event kind `{other}`")),
};
out.push(ev);
}
Ok(out)
}
fn string_col<'a>(batch: &'a RecordBatch, idx: usize, name: &str) -> Result<&'a StringArray> {
batch
.column(idx)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("column `{name}` (idx {idx}) is not StringArray"))
}
fn string_opt_col(batch: &RecordBatch, idx: usize) -> Vec<Option<String>> {
let arr = batch
.column(idx)
.as_any()
.downcast_ref::<StringArray>()
.expect("optional string column");
(0..arr.len())
.map(|i| if arr.is_null(i) { None } else { Some(arr.value(i).to_string()) })
.collect()
}
fn string_opt_col_if_present(batch: &RecordBatch, idx: usize) -> Vec<Option<String>> {
if idx >= batch.num_columns() {
return vec![None; batch.num_rows()];
}
match batch.column(idx).as_any().downcast_ref::<StringArray>() {
Some(arr) => (0..arr.len())
.map(|i| if arr.is_null(i) { None } else { Some(arr.value(i).to_string()) })
.collect(),
None => vec![None; batch.num_rows()],
}
}
fn string_list_col(batch: &RecordBatch, idx: usize) -> Result<Vec<Option<Vec<String>>>> {
use arrow::array::ListArray;
let arr = batch
.column(idx)
.as_any()
.downcast_ref::<ListArray>()
.ok_or_else(|| anyhow!("column {idx} is not ListArray"))?;
let mut out = Vec::with_capacity(arr.len());
for i in 0..arr.len() {
if arr.is_null(i) {
out.push(None);
continue;
}
let values = arr.value(i);
let strs = values
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("list element not StringArray"))?;
let mut v = Vec::with_capacity(strs.len());
for j in 0..strs.len() {
if !strs.is_null(j) {
v.push(strs.value(j).to_string());
}
}
out.push(Some(v));
}
Ok(out)
}
fn commit_list_col(batch: &RecordBatch, idx: usize) -> Result<Vec<Option<Vec<CommitRef>>>> {
use arrow::array::{ListArray, StructArray};
let arr = batch
.column(idx)
.as_any()
.downcast_ref::<ListArray>()
.ok_or_else(|| anyhow!("column {idx} (produced_commits) is not ListArray"))?;
let mut out = Vec::with_capacity(arr.len());
for i in 0..arr.len() {
if arr.is_null(i) {
out.push(None);
continue;
}
let values = arr.value(i);
let st = values
.as_any()
.downcast_ref::<StructArray>()
.ok_or_else(|| anyhow!("produced_commits element not StructArray"))?;
let repos = st
.column_by_name("repo")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.ok_or_else(|| anyhow!("commit struct missing repo"))?;
let shas = st
.column_by_name("sha")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.ok_or_else(|| anyhow!("commit struct missing sha"))?;
let mut v = Vec::with_capacity(st.len());
for j in 0..st.len() {
v.push(CommitRef {
repo: repos.value(j).to_string(),
sha: shas.value(j).to_string(),
});
}
out.push(Some(v));
}
Ok(out)
}
fn event_kind_str(ev: &Event) -> &'static str {
match ev {
Event::IdeaSubmitted { .. } => KIND_IDEA_SUBMITTED,
Event::IdeaTriaged { .. } => KIND_IDEA_TRIAGED,
Event::PlanCreated { .. } => KIND_PLAN_CREATED,
Event::NodeAdded { .. } => KIND_NODE_ADDED,
Event::EdgeAdded { .. } => KIND_EDGE_ADDED,
Event::NodeStatusChanged { .. } => KIND_NODE_STATUS_CHANGED,
Event::RunRecorded { .. } => KIND_RUN_RECORDED,
Event::PlanStatusChanged { .. } => KIND_PLAN_STATUS_CHANGED,
}
}
fn triage_str(d: TriageDecision) -> &'static str {
match d {
TriageDecision::Accept => "accept",
TriageDecision::Drop => "drop",
}
}
fn parse_triage(s: &str) -> Result<TriageDecision> {
match s {
"accept" => Ok(TriageDecision::Accept),
"drop" => Ok(TriageDecision::Drop),
other => Err(anyhow!("bad triage decision `{other}`")),
}
}
fn node_status_str(s: NodeStatus) -> &'static str {
match s {
NodeStatus::Pending => "pending",
NodeStatus::Ready => "ready",
NodeStatus::InProgress => "in_progress",
NodeStatus::Done => "done",
NodeStatus::Blocked => "blocked",
NodeStatus::Failed => "failed",
}
}
fn parse_node_status(s: &str) -> Result<NodeStatus> {
match s {
"pending" => Ok(NodeStatus::Pending),
"ready" => Ok(NodeStatus::Ready),
"in_progress" => Ok(NodeStatus::InProgress),
"done" => Ok(NodeStatus::Done),
"blocked" => Ok(NodeStatus::Blocked),
"failed" => Ok(NodeStatus::Failed),
other => Err(anyhow!("bad node status `{other}`")),
}
}
fn plan_status_str(s: PlanStatus) -> &'static str {
match s {
PlanStatus::Draft => "draft",
PlanStatus::Active => "active",
PlanStatus::Done => "done",
PlanStatus::Abandoned => "abandoned",
}
}
fn parse_plan_status(s: &str) -> Result<PlanStatus> {
match s {
"draft" => Ok(PlanStatus::Draft),
"active" => Ok(PlanStatus::Active),
"done" => Ok(PlanStatus::Done),
"abandoned" => Ok(PlanStatus::Abandoned),
other => Err(anyhow!("bad plan status `{other}`")),
}
}
fn run_outcome_str(o: RunOutcome) -> &'static str {
match o {
RunOutcome::Ok => "ok",
RunOutcome::Failed => "failed",
RunOutcome::Aborted => "aborted",
}
}
fn parse_run_outcome(s: &str) -> Result<RunOutcome> {
match s {
"ok" => Ok(RunOutcome::Ok),
"failed" => Ok(RunOutcome::Failed),
"aborted" => Ok(RunOutcome::Aborted),
other => Err(anyhow!("bad run outcome `{other}`")),
}
}
fn micros_to_dt(micros: i64) -> DateTime<Utc> {
let secs = micros.div_euclid(1_000_000);
let nanos = (micros.rem_euclid(1_000_000) * 1_000) as u32;
Utc.timestamp_opt(secs, nanos).single().unwrap_or_else(Utc::now)
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
#[test]
fn roundtrip_all_event_kinds() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let t = Utc.with_ymd_and_hms(2026, 5, 31, 12, 0, 0).unwrap();
let events = vec![
Event::IdeaSubmitted {
id: IdeaId::seq(1),
source: "test".into(),
text: "first idea".into(),
refs: vec!["a".into(), "b".into()],
item_kind: ItemKind::Idea,
ts: t,
},
Event::IdeaSubmitted {
id: IdeaId::seq(2),
source: "test".into(),
text: "panic in foo.rs".into(),
refs: vec![],
item_kind: ItemKind::Error,
ts: t,
},
Event::PlanCreated {
id: PlanId::seq(1),
idea_id: IdeaId::seq(1),
summary: "the plan".into(),
planner: "tester".into(),
ts: t,
},
Event::NodeAdded {
plan_id: PlanId::seq(1),
node_id: NodeId::seq(1),
kind: "rust:impl".into(),
params: serde_json::Map::new(),
targets: vec!["src/foo.rs".into()],
prompt_excerpt: Some("do the thing".into()),
ts: t,
},
Event::EdgeAdded {
plan_id: PlanId::seq(1),
from_node: NodeId::seq(1),
to_node: NodeId::seq(2),
ts: t,
},
Event::NodeStatusChanged {
plan_id: PlanId::seq(1),
node_id: NodeId::seq(1),
status: NodeStatus::Done,
why: Some("done".into()),
ts: t,
},
Event::RunRecorded {
plan_id: PlanId::seq(1),
node_id: NodeId::seq(1),
run_id: RunId::seq(1),
ran_by: "tester".into(),
outcome: RunOutcome::Ok,
log_ref: None,
produced_commits: vec![CommitRef { repo: "nornir".into(), sha: "abc123".into() }],
produced_test_runs: vec!["smoke-1".into()],
ts: t,
},
Event::PlanStatusChanged {
plan_id: PlanId::seq(1),
status: PlanStatus::Active,
why: None,
ts: t,
},
];
wh.block_on(async {
for ev in &events {
append_event(&wh, ev).await.unwrap();
}
});
let loaded = wh.block_on(async { load_all_events(&wh).await.unwrap() });
assert_eq!(loaded.len(), events.len(), "round-trip count");
let mut saw_refs = false;
let mut saw_commits = false;
let mut saw_error_kind = false;
let mut saw_idea_kind = false;
for ev in &loaded {
if let Event::IdeaSubmitted { refs, item_kind, id, .. } = ev {
match (id.as_str(), item_kind) {
("i-001", ItemKind::Idea) => {
assert_eq!(refs.len(), 2);
saw_refs = true;
saw_idea_kind = true;
}
("i-002", ItemKind::Error) => saw_error_kind = true,
other => panic!("unexpected idea kind: {other:?}"),
}
}
if let Event::RunRecorded { produced_commits, .. } = ev {
assert_eq!(produced_commits.len(), 1);
assert_eq!(produced_commits[0].repo, "nornir");
saw_commits = true;
}
}
assert!(saw_refs && saw_commits, "refs and commits should round-trip");
assert!(saw_idea_kind && saw_error_kind, "both item_kinds round-trip");
}
#[test]
fn stale_27col_funnel_table_evolves_and_kind_round_trips() {
use iceberg::Catalog;
use iceberg::spec::Schema;
use iceberg::TableCreation;
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let ident = wh.table_ident(TABLE_FUNNEL_EVENTS);
wh.block_on(async {
let cat = wh.catalog();
cat.drop_table(&ident).await.unwrap();
let full = crate::warehouse::iceberg_schema::funnel_events().unwrap();
let fields: Vec<_> = full
.as_struct()
.fields()
.iter()
.filter(|f| f.name != "item_kind")
.cloned()
.collect();
let stale = Schema::builder().with_schema_id(0).with_fields(fields).build().unwrap();
assert_eq!(stale.as_struct().fields().len(), 27);
let creation =
TableCreation::builder().name(ident.name().to_string()).schema(stale).build();
cat.create_table(ident.namespace(), creation).await.unwrap();
});
let ev = Event::IdeaSubmitted {
id: IdeaId::seq(1),
source: "test".into(),
text: "stale-table error report".into(),
refs: vec![],
item_kind: ItemKind::Error,
ts: Utc::now(),
};
wh.block_on(async { append_event(&wh, &ev).await.unwrap() });
wh.block_on(async {
let t = wh.catalog().load_table(&ident).await.unwrap();
let names: Vec<&str> =
t.metadata().current_schema().as_struct().fields().iter().map(|f| f.name.as_str()).collect();
assert_eq!(names.len(), 28);
assert!(names.contains(&"item_kind"));
});
let loaded = wh.block_on(async { load_all_events(&wh).await.unwrap() });
assert_eq!(loaded.len(), 1);
match &loaded[0] {
Event::IdeaSubmitted { item_kind, .. } => assert_eq!(*item_kind, ItemKind::Error),
other => panic!("expected IdeaSubmitted, got {other:?}"),
}
}
}