Skip to main content

construct/sop/
audit.rs

1use std::sync::Arc;
2
3use anyhow::Result;
4use tracing::{info, warn};
5
6use super::types::{SopRun, SopStepResult};
7use crate::memory::traits::{Memory, MemoryCategory};
8
9const SOP_CATEGORY: &str = "sop";
10
11/// Persists SOP execution runs and step results to the Memory backend.
12///
13/// Storage keys:
14/// - `sop_run_{run_id}` — full `SopRun` JSON (created on start, updated on complete)
15/// - `sop_step_{run_id}_{step_number}` — `SopStepResult` JSON (one per step)
16pub struct SopAuditLogger {
17    memory: Arc<dyn Memory>,
18}
19
20impl SopAuditLogger {
21    pub fn new(memory: Arc<dyn Memory>) -> Self {
22        Self { memory }
23    }
24
25    /// Log the start of a new SOP run.
26    pub async fn log_run_start(&self, run: &SopRun) -> Result<()> {
27        let key = run_key(&run.run_id);
28        let content = serde_json::to_string_pretty(run)?;
29        self.memory.store(&key, &content, category(), None).await?;
30        info!(
31            "SOP audit: run {} started for '{}'",
32            run.run_id, run.sop_name
33        );
34        Ok(())
35    }
36
37    /// Log a step result.
38    pub async fn log_step_result(&self, run_id: &str, result: &SopStepResult) -> Result<()> {
39        let key = step_key(run_id, result.step_number);
40        let content = serde_json::to_string_pretty(result)?;
41        self.memory.store(&key, &content, category(), None).await?;
42        Ok(())
43    }
44
45    /// Log run completion (updates the run record with final state).
46    pub async fn log_run_complete(&self, run: &SopRun) -> Result<()> {
47        let key = run_key(&run.run_id);
48        let content = serde_json::to_string_pretty(run)?;
49        self.memory.store(&key, &content, category(), None).await?;
50        info!(
51            "SOP audit: run {} finished with status {}",
52            run.run_id, run.status
53        );
54        Ok(())
55    }
56
57    /// Log an operator approval event for a specific step.
58    pub async fn log_approval(&self, run: &SopRun, step_number: u32) -> Result<()> {
59        let key = format!("sop_approval_{}_{step_number}", run.run_id);
60        let content = serde_json::to_string_pretty(run)?;
61        self.memory.store(&key, &content, category(), None).await?;
62        info!(
63            "SOP audit: run {} step {step_number} approved by operator",
64            run.run_id
65        );
66        Ok(())
67    }
68
69    /// Log a timeout-based auto-approval event for a specific step.
70    pub async fn log_timeout_auto_approve(&self, run: &SopRun, step_number: u32) -> Result<()> {
71        let key = format!("sop_timeout_approve_{}_{step_number}", run.run_id);
72        let content = serde_json::to_string_pretty(run)?;
73        self.memory.store(&key, &content, category(), None).await?;
74        info!(
75            "SOP audit: run {} step {step_number} auto-approved after timeout",
76            run.run_id
77        );
78        Ok(())
79    }
80
81    /// Retrieve a stored run by ID (if it exists in memory).
82    pub async fn get_run(&self, run_id: &str) -> Result<Option<SopRun>> {
83        let key = run_key(run_id);
84        match self.memory.get(&key).await? {
85            Some(entry) => {
86                let run: SopRun = serde_json::from_str(&entry.content).map_err(|e| {
87                    warn!("SOP audit: failed to parse run {run_id}: {e}");
88                    e
89                })?;
90                Ok(Some(run))
91            }
92            None => Ok(None),
93        }
94    }
95
96    /// List all stored SOP run keys.
97    pub async fn list_runs(&self) -> Result<Vec<String>> {
98        let entries = self.memory.list(Some(&category()), None).await?;
99        let run_keys: Vec<String> = entries
100            .into_iter()
101            .filter(|e| e.key.starts_with("sop_run_"))
102            .map(|e| e.key)
103            .collect();
104        Ok(run_keys)
105    }
106}
107
108fn run_key(run_id: &str) -> String {
109    format!("sop_run_{run_id}")
110}
111
112fn step_key(run_id: &str, step_number: u32) -> String {
113    format!("sop_step_{run_id}_{step_number}")
114}
115
116fn category() -> MemoryCategory {
117    MemoryCategory::Custom(SOP_CATEGORY.into())
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123    use crate::sop::types::{SopEvent, SopRunStatus, SopStepStatus, SopTriggerSource};
124
125    fn test_run() -> SopRun {
126        SopRun {
127            run_id: "run-test-001".into(),
128            sop_name: "test-sop".into(),
129            trigger_event: SopEvent {
130                source: SopTriggerSource::Manual,
131                topic: None,
132                payload: None,
133                timestamp: "2026-02-19T12:00:00Z".into(),
134            },
135            status: SopRunStatus::Running,
136            current_step: 1,
137            total_steps: 3,
138            started_at: "2026-02-19T12:00:00Z".into(),
139            completed_at: None,
140            step_results: Vec::new(),
141            waiting_since: None,
142            llm_calls_saved: 0,
143        }
144    }
145
146    fn test_step_result(n: u32) -> SopStepResult {
147        SopStepResult {
148            step_number: n,
149            status: SopStepStatus::Completed,
150            output: format!("Step {n} completed"),
151            started_at: "2026-02-19T12:00:00Z".into(),
152            completed_at: Some("2026-02-19T12:00:05Z".into()),
153        }
154    }
155
156    #[tokio::test]
157    async fn audit_roundtrip() {
158        let memory: Arc<dyn Memory> = Arc::new(crate::memory::test_memory::TestMemory::new());
159
160        let logger = SopAuditLogger::new(memory);
161
162        // Log run start
163        let run = test_run();
164        logger.log_run_start(&run).await.unwrap();
165
166        // Log step result
167        let step = test_step_result(1);
168        logger.log_step_result(&run.run_id, &step).await.unwrap();
169
170        // Log run complete
171        let mut completed_run = run.clone();
172        completed_run.status = SopRunStatus::Completed;
173        completed_run.completed_at = Some("2026-02-19T12:05:00Z".into());
174        completed_run.step_results = vec![step];
175        logger.log_run_complete(&completed_run).await.unwrap();
176
177        // Retrieve
178        let retrieved = logger.get_run("run-test-001").await.unwrap().unwrap();
179        assert_eq!(retrieved.run_id, "run-test-001");
180        assert_eq!(retrieved.status, SopRunStatus::Completed);
181        assert_eq!(retrieved.step_results.len(), 1);
182
183        // List runs
184        let keys = logger.list_runs().await.unwrap();
185        assert!(keys.contains(&"sop_run_run-test-001".to_string()));
186    }
187
188    #[tokio::test]
189    async fn log_approval_persists_entry() {
190        let memory: Arc<dyn Memory> = Arc::new(crate::memory::test_memory::TestMemory::new());
191
192        let logger = SopAuditLogger::new(memory.clone());
193        let run = test_run();
194        logger.log_approval(&run, 1).await.unwrap();
195
196        let entries = memory.list(Some(&category()), None).await.unwrap();
197        let approval_keys: Vec<_> = entries
198            .iter()
199            .filter(|e| e.key.starts_with("sop_approval_"))
200            .collect();
201        assert_eq!(approval_keys.len(), 1);
202        assert!(approval_keys[0].key.contains("run-test-001"));
203    }
204
205    #[tokio::test]
206    async fn log_timeout_auto_approve_persists_entry() {
207        let memory: Arc<dyn Memory> = Arc::new(crate::memory::test_memory::TestMemory::new());
208
209        let logger = SopAuditLogger::new(memory.clone());
210        let run = test_run();
211        logger.log_timeout_auto_approve(&run, 1).await.unwrap();
212
213        let entries = memory.list(Some(&category()), None).await.unwrap();
214        let timeout_keys: Vec<_> = entries
215            .iter()
216            .filter(|e| e.key.starts_with("sop_timeout_approve_"))
217            .collect();
218        assert_eq!(timeout_keys.len(), 1);
219        assert!(timeout_keys[0].key.contains("run-test-001"));
220    }
221
222    #[tokio::test]
223    async fn get_nonexistent_run_returns_none() {
224        let memory: Arc<dyn Memory> = Arc::new(crate::memory::test_memory::TestMemory::new());
225
226        let logger = SopAuditLogger::new(memory);
227        let result = logger.get_run("nonexistent").await.unwrap();
228        assert!(result.is_none());
229    }
230}