1use std::collections::HashMap;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use async_trait::async_trait;
13use serde::{Deserialize, Serialize};
14use serde_json::Value;
15use tokio::fs;
16use tokio::sync::RwLock;
17
18use crate::error::Result;
19use crate::event::WorkflowEvent;
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct ExecutionRecord {
28 pub id: String,
30 pub workflow_id: String,
32 pub workflow_name: String,
34 pub status: ExecutionStatus,
36 pub started_at: u64,
38 pub finished_at: Option<u64>,
40 pub duration_ms: u64,
42 pub total_tokens: u64,
44 pub total_input_tokens: u64,
46 pub total_output_tokens: u64,
48 pub estimated_cost_usd: f64,
50 pub input: Value,
52 pub node_records: Vec<NodeRecord>,
54 pub error: Option<String>,
56 pub trigger: TriggerType,
58 pub metadata: HashMap<String, Value>,
60}
61
62#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
64#[serde(rename_all = "snake_case")]
65pub enum ExecutionStatus {
66 Running,
67 Success,
68 Failed,
69 Paused,
70 Cancelled,
71}
72
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
75#[serde(rename_all = "snake_case")]
76pub enum TriggerType {
77 Manual,
78 Scheduled,
79 Webhook,
80 Api,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct NodeRecord {
90 pub node_id: String,
92 pub node_name: String,
94 pub node_kind: String,
96 pub status: NodeStatus,
98 pub started_at: u64,
100 pub finished_at: Option<u64>,
102 pub duration_ms: u64,
104 pub input: Value,
106 pub output: Option<Value>,
108 pub tokens_used: u64,
110 pub input_tokens: u64,
112 pub output_tokens: u64,
114 pub total_tokens: u64,
116 pub estimated_cost_usd: f64,
118 pub llm_calls: Vec<LlmCallRecord>,
120 pub retry_attempts: u32,
122 pub error: Option<String>,
124 pub tool_calls: Vec<ToolCallRecord>,
126}
127
128#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
130#[serde(rename_all = "snake_case")]
131pub enum NodeStatus {
132 Pending,
133 Running,
134 Success,
135 Failed,
136 Skipped,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct LlmCallRecord {
146 pub input_tokens: u64,
148 pub output_tokens: u64,
150 pub duration_ms: u64,
152 pub is_retry: bool,
154 pub reason: String,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize)]
164pub struct ToolCallRecord {
165 pub tool: String,
167 pub args: Value,
169 pub result: Option<Value>,
171 pub duration_ms: u64,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct ExecutionStats {
182 pub total_executions: u64,
184 pub success_count: u64,
186 pub failure_count: u64,
188 pub avg_duration_ms: f64,
190 pub total_tokens: u64,
192 pub total_cost_usd: f64,
194 pub error_rate: f64,
196 pub top_errors: Vec<ErrorSummary>,
198 pub per_workflow: HashMap<String, WorkflowStats>,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize)]
204pub struct ErrorSummary {
205 pub message: String,
207 pub count: u64,
209 pub last_seen: u64,
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct WorkflowStats {
216 pub workflow_id: String,
218 pub execution_count: u64,
220 pub success_rate: f64,
222 pub avg_duration_ms: f64,
224 pub avg_tokens: f64,
226 pub avg_cost_usd: f64,
228}
229
230#[derive(Debug, Clone, Default, Serialize, Deserialize)]
236pub struct ExecutionFilter {
237 pub workflow_id: Option<String>,
239 pub status: Option<ExecutionStatus>,
241 pub since: Option<u64>,
243 pub until: Option<u64>,
245 pub limit: Option<usize>,
247 pub offset: Option<usize>,
249}
250
251#[async_trait]
257pub trait ExecutionStore: Send + Sync {
258 async fn save(&self, record: &ExecutionRecord) -> Result<()>;
260 async fn get(&self, id: &str) -> Result<Option<ExecutionRecord>>;
262 async fn list(&self, filter: &ExecutionFilter) -> Result<Vec<ExecutionRecord>>;
264 async fn stats(&self, filter: &ExecutionFilter) -> Result<ExecutionStats>;
266 async fn prune(&self, older_than_ms: u64) -> Result<u64>;
269}
270
271pub struct FileExecutionStore {
285 base_dir: PathBuf,
286}
287
288impl FileExecutionStore {
289 pub fn new(dir: impl Into<PathBuf>) -> Self {
291 Self {
292 base_dir: dir.into(),
293 }
294 }
295
296 fn executions_dir(&self) -> PathBuf {
298 self.base_dir.join("executions")
299 }
300
301 fn record_path(&self, id: &str) -> PathBuf {
303 self.executions_dir().join(format!("{id}.json"))
304 }
305
306 async fn read_all(&self, filter: &ExecutionFilter) -> Result<Vec<ExecutionRecord>> {
308 let dir = self.executions_dir();
309 if !dir.exists() {
310 return Ok(Vec::new());
311 }
312
313 let mut entries = fs::read_dir(&dir).await?;
314 let mut records = Vec::new();
315
316 while let Some(entry) = entries.next_entry().await? {
317 let path = entry.path();
318 if path.extension().and_then(|e| e.to_str()) != Some("json") {
319 continue;
320 }
321 let data = fs::read(&path).await?;
322 let record: ExecutionRecord = serde_json::from_slice(&data)
323 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
324
325 if let Some(ref wf_id) = filter.workflow_id {
327 if record.workflow_id != *wf_id {
328 continue;
329 }
330 }
331 if let Some(ref status) = filter.status {
332 if record.status != *status {
333 continue;
334 }
335 }
336 if let Some(since) = filter.since {
337 if record.started_at < since {
338 continue;
339 }
340 }
341 if let Some(until) = filter.until {
342 if record.started_at >= until {
343 continue;
344 }
345 }
346
347 records.push(record);
348 }
349
350 records.sort_by(|a, b| b.started_at.cmp(&a.started_at));
352
353 Ok(records)
354 }
355}
356
357#[async_trait]
358impl ExecutionStore for FileExecutionStore {
359 async fn save(&self, record: &ExecutionRecord) -> Result<()> {
360 let dir = self.executions_dir();
361 fs::create_dir_all(&dir).await?;
362 let path = self.record_path(&record.id);
363 let data = serde_json::to_vec_pretty(record)
364 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
365 fs::write(&path, data).await?;
366 Ok(())
367 }
368
369 async fn get(&self, id: &str) -> Result<Option<ExecutionRecord>> {
370 let path = self.record_path(id);
371 if !path.exists() {
372 return Ok(None);
373 }
374 let data = fs::read(&path).await?;
375 let record: ExecutionRecord = serde_json::from_slice(&data)
376 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
377 Ok(Some(record))
378 }
379
380 async fn list(&self, filter: &ExecutionFilter) -> Result<Vec<ExecutionRecord>> {
381 let mut records = self.read_all(filter).await?;
382
383 if let Some(offset) = filter.offset {
385 if offset >= records.len() {
386 return Ok(Vec::new());
387 }
388 records = records.split_off(offset);
389 }
390
391 if let Some(limit) = filter.limit {
393 records.truncate(limit);
394 }
395
396 Ok(records)
397 }
398
399 async fn stats(&self, filter: &ExecutionFilter) -> Result<ExecutionStats> {
400 let records = self.read_all(filter).await?;
401
402 let total_executions = records.len() as u64;
403 let mut success_count: u64 = 0;
404 let mut failure_count: u64 = 0;
405 let mut total_duration: u64 = 0;
406 let mut total_tokens: u64 = 0;
407 let mut total_cost_usd: f64 = 0.0;
408 let mut error_counts: HashMap<String, (u64, u64)> = HashMap::new(); let mut per_workflow: HashMap<String, Vec<&ExecutionRecord>> = HashMap::new();
410
411 for record in &records {
412 match record.status {
413 ExecutionStatus::Success => success_count += 1,
414 ExecutionStatus::Failed => failure_count += 1,
415 _ => {}
416 }
417 total_duration += record.duration_ms;
418 total_tokens += record.total_tokens;
419 total_cost_usd += record.estimated_cost_usd;
420
421 if let Some(ref err) = record.error {
422 let entry = error_counts.entry(err.clone()).or_insert((0, 0));
423 entry.0 += 1;
424 if record.started_at > entry.1 {
425 entry.1 = record.started_at;
426 }
427 }
428
429 per_workflow
430 .entry(record.workflow_id.clone())
431 .or_default()
432 .push(record);
433 }
434
435 let avg_duration_ms = if total_executions > 0 {
436 total_duration as f64 / total_executions as f64
437 } else {
438 0.0
439 };
440
441 let error_rate = if total_executions > 0 {
442 failure_count as f64 / total_executions as f64
443 } else {
444 0.0
445 };
446
447 let mut top_errors: Vec<ErrorSummary> = error_counts
448 .into_iter()
449 .map(|(message, (count, last_seen))| ErrorSummary {
450 message,
451 count,
452 last_seen,
453 })
454 .collect();
455 top_errors.sort_by(|a, b| b.count.cmp(&a.count));
456
457 let per_workflow = per_workflow
458 .into_iter()
459 .map(|(wf_id, recs)| {
460 let count = recs.len() as u64;
461 let successes = recs
462 .iter()
463 .filter(|r| r.status == ExecutionStatus::Success)
464 .count() as u64;
465 let dur: u64 = recs.iter().map(|r| r.duration_ms).sum();
466 let tok: u64 = recs.iter().map(|r| r.total_tokens).sum();
467 let cost: f64 = recs.iter().map(|r| r.estimated_cost_usd).sum();
468
469 let stats = WorkflowStats {
470 workflow_id: wf_id.clone(),
471 execution_count: count,
472 success_rate: if count > 0 {
473 successes as f64 / count as f64
474 } else {
475 0.0
476 },
477 avg_duration_ms: if count > 0 {
478 dur as f64 / count as f64
479 } else {
480 0.0
481 },
482 avg_tokens: if count > 0 {
483 tok as f64 / count as f64
484 } else {
485 0.0
486 },
487 avg_cost_usd: if count > 0 {
488 cost as f64 / count as f64
489 } else {
490 0.0
491 },
492 };
493 (wf_id, stats)
494 })
495 .collect();
496
497 Ok(ExecutionStats {
498 total_executions,
499 success_count,
500 failure_count,
501 avg_duration_ms,
502 total_tokens,
503 total_cost_usd,
504 error_rate,
505 top_errors,
506 per_workflow,
507 })
508 }
509
510 async fn prune(&self, older_than_ms: u64) -> Result<u64> {
511 let dir = self.executions_dir();
512 if !dir.exists() {
513 return Ok(0);
514 }
515
516 let mut entries = fs::read_dir(&dir).await?;
517 let mut deleted: u64 = 0;
518
519 while let Some(entry) = entries.next_entry().await? {
520 let path = entry.path();
521 if path.extension().and_then(|e| e.to_str()) != Some("json") {
522 continue;
523 }
524 let data = fs::read(&path).await?;
525 let record: ExecutionRecord = match serde_json::from_slice(&data) {
526 Ok(r) => r,
527 Err(_) => continue,
528 };
529
530 if let Some(finished_at) = record.finished_at {
531 if finished_at < older_than_ms {
532 fs::remove_file(&path).await?;
533 deleted += 1;
534 }
535 }
536 }
537
538 Ok(deleted)
539 }
540}
541
542fn now_ms() -> u64 {
548 SystemTime::now()
549 .duration_since(UNIX_EPOCH)
550 .unwrap()
551 .as_millis() as u64
552}
553
554pub struct ExecutionObserver {
561 store: Arc<dyn ExecutionStore>,
562 current: RwLock<Option<ExecutionRecord>>,
563}
564
565impl ExecutionObserver {
566 pub fn new(store: Arc<dyn ExecutionStore>) -> Self {
568 Self {
569 store,
570 current: RwLock::new(None),
571 }
572 }
573
574 pub async fn handle_event(&self, event: &WorkflowEvent) {
576 match event {
577 WorkflowEvent::WorkflowStarted { workflow_id, .. } => {
578 let record = ExecutionRecord {
579 id: uuid::Uuid::new_v4().to_string(),
580 workflow_id: workflow_id.clone(),
581 workflow_name: String::new(),
582 status: ExecutionStatus::Running,
583 started_at: now_ms(),
584 finished_at: None,
585 duration_ms: 0,
586 total_tokens: 0,
587 total_input_tokens: 0,
588 total_output_tokens: 0,
589 estimated_cost_usd: 0.0,
590 input: Value::Null,
591 node_records: Vec::new(),
592 error: None,
593 trigger: TriggerType::Api,
594 metadata: HashMap::new(),
595 };
596 let mut current = self.current.write().await;
597 *current = Some(record);
598 }
599
600 WorkflowEvent::NodeStarted { node_id, node_name, node_kind, .. } => {
601 let mut current = self.current.write().await;
602 if let Some(ref mut record) = *current {
603 let node_record = NodeRecord {
604 node_id: node_id.clone(),
605 node_name: node_name.clone(),
606 node_kind: node_kind.clone(),
607 status: NodeStatus::Running,
608 started_at: now_ms(),
609 finished_at: None,
610 duration_ms: 0,
611 input: Value::Null,
612 output: None,
613 tokens_used: 0,
614 input_tokens: 0,
615 output_tokens: 0,
616 total_tokens: 0,
617 estimated_cost_usd: 0.0,
618 llm_calls: Vec::new(),
619 retry_attempts: 0,
620 error: None,
621 tool_calls: Vec::new(),
622 };
623 record.node_records.push(node_record);
624 }
625 }
626
627 WorkflowEvent::NodeInputResolved { node_id, input, .. } => {
628 let mut current = self.current.write().await;
629 if let Some(ref mut record) = *current {
630 if let Some(node) = find_node_record(&mut record.node_records, node_id) {
631 node.input = input.clone();
632 }
633 }
634 }
635
636 WorkflowEvent::NodeCompleted {
637 node_id,
638 duration_ms,
639 input_tokens,
640 output_tokens,
641 tokens_used,
642 output,
643 ..
644 } => {
645 let mut current = self.current.write().await;
646 if let Some(ref mut record) = *current {
647 if let Some(node) = find_node_record(&mut record.node_records, node_id) {
648 node.status = NodeStatus::Success;
649 node.duration_ms = *duration_ms;
650 node.tokens_used = *tokens_used;
651 node.input_tokens = *input_tokens;
652 node.output_tokens = *output_tokens;
653 node.total_tokens = *tokens_used;
654 node.finished_at = Some(now_ms());
655 node.output = output.clone();
656 }
657 record.total_input_tokens += *input_tokens;
659 record.total_output_tokens += *output_tokens;
660 }
661 }
662
663 WorkflowEvent::NodeFailed {
664 node_id, error, ..
665 } => {
666 let mut current = self.current.write().await;
667 if let Some(ref mut record) = *current {
668 if let Some(node) = find_node_record(&mut record.node_records, node_id) {
669 node.status = NodeStatus::Failed;
670 node.error = Some(error.clone());
671 node.finished_at = Some(now_ms());
672 }
673 }
674 }
675
676 WorkflowEvent::NodeRetryAttempt { node_id, .. } => {
677 let mut current = self.current.write().await;
678 if let Some(ref mut record) = *current {
679 if let Some(node) = find_node_record(&mut record.node_records, node_id) {
680 node.retry_attempts += 1;
681 }
682 }
683 }
684
685 WorkflowEvent::WorkflowCompleted {
686 duration_ms,
687 total_tokens,
688 ..
689 } => {
690 {
691 let mut current = self.current.write().await;
692 if let Some(ref mut record) = *current {
693 record.status = ExecutionStatus::Success;
694 record.finished_at = Some(now_ms());
695 record.duration_ms = *duration_ms;
696 record.total_tokens = *total_tokens;
697 }
698 }
699 let _ = self.flush().await;
701 }
702
703 WorkflowEvent::WorkflowFailed {
704 error, duration_ms, ..
705 } => {
706 {
707 let mut current = self.current.write().await;
708 if let Some(ref mut record) = *current {
709 record.status = ExecutionStatus::Failed;
710 record.error = Some(error.clone());
711 record.finished_at = Some(now_ms());
712 record.duration_ms = *duration_ms;
713 }
714 }
715 let _ = self.flush().await;
717 }
718
719 WorkflowEvent::NodeLlmCall {
720 node_id,
721 input_tokens,
722 output_tokens,
723 duration_ms,
724 is_retry,
725 reason,
726 ..
727 } => {
728 let mut current = self.current.write().await;
729 if let Some(ref mut record) = *current {
730 if let Some(node) = find_node_record(&mut record.node_records, node_id) {
731 node.llm_calls.push(LlmCallRecord {
732 input_tokens: *input_tokens,
733 output_tokens: *output_tokens,
734 duration_ms: *duration_ms,
735 is_retry: *is_retry,
736 reason: reason.clone(),
737 });
738 }
739 }
740 }
741
742 _ => {}
744 }
745 }
746
747 pub async fn flush(&self) -> Result<()> {
749 let current = self.current.read().await;
750 if let Some(ref record) = *current {
751 self.store.save(record).await?;
752 }
753 Ok(())
754 }
755}
756
757fn find_node_record<'a>(records: &'a mut [NodeRecord], node_id: &str) -> Option<&'a mut NodeRecord> {
759 records.iter_mut().find(|r| r.node_id == node_id)
760}