Skip to main content

axon/
flow_version.rs

1//! Flow Versioning — version tracking and rollback for deployed flows.
2//!
3//! Each deploy of a flow creates a new version entry. The version registry
4//! tracks the full history per flow, supporting:
5//!   - Version listing with metadata (source hash, timestamp, deploy count)
6//!   - Active version tracking (which version is currently live)
7//!   - Rollback to any previous version
8//!   - Source diff between versions (via stored source snapshots)
9//!
10//! Versions are identified by a monotonic counter per flow (v1, v2, v3...).
11//! The active version is always the most recently deployed unless rolled back.
12
13use std::collections::HashMap;
14use std::time::{Duration, Instant};
15
16// ── Version types ────────────────────────────────────────────────────────
17
18/// A single version of a deployed flow.
19#[derive(Debug, Clone, serde::Serialize)]
20pub struct FlowVersion {
21    /// Version number (1-indexed, monotonic per flow).
22    pub version: u32,
23    /// SHA-256 hash of the source code (first 12 hex chars).
24    pub source_hash: String,
25    /// Full source snapshot for rollback.
26    #[serde(skip_serializing)]
27    pub source: String,
28    /// Original filename.
29    pub source_file: String,
30    /// Backend used for compilation.
31    pub backend: String,
32    /// Flow names extracted from this source.
33    pub flow_names: Vec<String>,
34    /// Time of deployment (elapsed since registry creation).
35    pub deployed_at: Duration,
36    /// Whether this version is currently active.
37    pub active: bool,
38}
39
40/// Version history for a single flow.
41#[derive(Debug, Clone)]
42pub struct FlowHistory {
43    /// Flow name (key).
44    pub flow_name: String,
45    /// All versions, ordered by version number.
46    pub versions: Vec<FlowVersion>,
47    /// Currently active version number.
48    pub active_version: u32,
49    /// Total deploy count.
50    pub deploy_count: u32,
51}
52
53impl FlowHistory {
54    fn new(flow_name: &str) -> Self {
55        FlowHistory {
56            flow_name: flow_name.to_string(),
57            versions: Vec::new(),
58            active_version: 0,
59            deploy_count: 0,
60        }
61    }
62
63    /// Add a new version. Returns the version number.
64    fn push_version(&mut self, source: &str, source_file: &str, backend: &str, flow_names: &[String], deployed_at: Duration) -> u32 {
65        self.deploy_count += 1;
66        let version = self.deploy_count;
67
68        // Deactivate previous active version
69        for v in &mut self.versions {
70            v.active = false;
71        }
72
73        let hash = hash_source(source);
74        self.versions.push(FlowVersion {
75            version,
76            source_hash: hash,
77            source: source.to_string(),
78            source_file: source_file.to_string(),
79            backend: backend.to_string(),
80            flow_names: flow_names.to_vec(),
81            deployed_at,
82            active: true,
83        });
84        self.active_version = version;
85        version
86    }
87
88    /// Get a specific version.
89    fn get_version(&self, version: u32) -> Option<&FlowVersion> {
90        self.versions.iter().find(|v| v.version == version)
91    }
92
93    /// Get the active version.
94    pub fn active(&self) -> Option<&FlowVersion> {
95        self.versions.iter().find(|v| v.active)
96    }
97
98    /// Rollback to a specific version. Returns Ok(source) or Err if version not found.
99    fn rollback(&mut self, target_version: u32) -> Result<String, String> {
100        let exists = self.versions.iter().any(|v| v.version == target_version);
101        if !exists {
102            return Err(format!("version {} not found for flow '{}'", target_version, self.flow_name));
103        }
104
105        for v in &mut self.versions {
106            v.active = v.version == target_version;
107        }
108        self.active_version = target_version;
109
110        let source = self.versions.iter()
111            .find(|v| v.version == target_version)
112            .map(|v| v.source.clone())
113            .unwrap();
114
115        Ok(source)
116    }
117}
118
119// ── Version Registry ─────────────────────────────────────────────────────
120
121/// Registry tracking version history for all deployed flows.
122pub struct VersionRegistry {
123    histories: HashMap<String, FlowHistory>,
124    created_at: Instant,
125}
126
127impl VersionRegistry {
128    /// Create a new empty registry.
129    pub fn new() -> Self {
130        VersionRegistry {
131            histories: HashMap::new(),
132            created_at: Instant::now(),
133        }
134    }
135
136    /// Record a new deployment. Returns (flow_name, version_number) pairs.
137    pub fn record_deploy(
138        &mut self,
139        flow_names: &[String],
140        source: &str,
141        source_file: &str,
142        backend: &str,
143    ) -> Vec<(String, u32)> {
144        let deployed_at = self.created_at.elapsed();
145        let mut results = Vec::new();
146
147        for name in flow_names {
148            let history = self.histories
149                .entry(name.clone())
150                .or_insert_with(|| FlowHistory::new(name));
151
152            let version = history.push_version(source, source_file, backend, flow_names, deployed_at);
153            results.push((name.clone(), version));
154        }
155
156        results
157    }
158
159    /// Get version history for a flow.
160    pub fn get_history(&self, flow_name: &str) -> Option<&FlowHistory> {
161        self.histories.get(flow_name)
162    }
163
164    /// Get a specific version of a flow.
165    pub fn get_version(&self, flow_name: &str, version: u32) -> Option<&FlowVersion> {
166        self.histories.get(flow_name)?.get_version(version)
167    }
168
169    /// Get the active version of a flow.
170    pub fn get_active(&self, flow_name: &str) -> Option<&FlowVersion> {
171        self.histories.get(flow_name)?.active()
172    }
173
174    /// Rollback a flow to a specific version. Returns the source code.
175    pub fn rollback(&mut self, flow_name: &str, target_version: u32) -> Result<String, String> {
176        let history = self.histories.get_mut(flow_name)
177            .ok_or_else(|| format!("flow '{}' not found", flow_name))?;
178        history.rollback(target_version)
179    }
180
181    /// List all flows with their active version.
182    pub fn list_flows(&self) -> Vec<FlowVersionSummary> {
183        let mut flows: Vec<FlowVersionSummary> = self.histories.values().map(|h| {
184            FlowVersionSummary {
185                flow_name: h.flow_name.clone(),
186                active_version: h.active_version,
187                total_versions: h.versions.len() as u32,
188                deploy_count: h.deploy_count,
189                source_hash: h.active().map(|v| v.source_hash.clone()).unwrap_or_default(),
190            }
191        }).collect();
192        flows.sort_by(|a, b| a.flow_name.cmp(&b.flow_name));
193        flows
194    }
195
196    /// Total number of tracked flows.
197    pub fn flow_count(&self) -> usize {
198        self.histories.len()
199    }
200
201    /// Total number of versions across all flows.
202    pub fn total_versions(&self) -> usize {
203        self.histories.values().map(|h| h.versions.len()).sum()
204    }
205}
206
207/// Summary of a flow's version state.
208#[derive(Debug, Clone, serde::Serialize)]
209pub struct FlowVersionSummary {
210    pub flow_name: String,
211    pub active_version: u32,
212    pub total_versions: u32,
213    pub deploy_count: u32,
214    pub source_hash: String,
215}
216
217// ── Helpers ──────────────────────────────────────────────────────────────
218
219/// Compute a short hash of source code (first 12 hex chars of a simple hash).
220fn hash_source(source: &str) -> String {
221    // Simple FNV-1a hash (no crypto dependency needed)
222    let mut hash: u64 = 0xcbf29ce484222325;
223    for byte in source.bytes() {
224        hash ^= byte as u64;
225        hash = hash.wrapping_mul(0x100000001b3);
226    }
227    format!("{:016x}", hash)[..12].to_string()
228}
229
230// ── Tests ────────────────────────────────────────────────────────────────
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    #[test]
237    fn hash_deterministic() {
238        let h1 = hash_source("persona P { tone: \"analytical\" }");
239        let h2 = hash_source("persona P { tone: \"analytical\" }");
240        assert_eq!(h1, h2);
241        assert_eq!(h1.len(), 12);
242    }
243
244    #[test]
245    fn hash_differs_for_different_source() {
246        let h1 = hash_source("version 1");
247        let h2 = hash_source("version 2");
248        assert_ne!(h1, h2);
249    }
250
251    #[test]
252    fn registry_record_deploy() {
253        let mut reg = VersionRegistry::new();
254        let flows = vec!["Flow1".to_string()];
255        let results = reg.record_deploy(&flows, "source v1", "test.axon", "anthropic");
256
257        assert_eq!(results.len(), 1);
258        assert_eq!(results[0], ("Flow1".to_string(), 1));
259        assert_eq!(reg.flow_count(), 1);
260        assert_eq!(reg.total_versions(), 1);
261    }
262
263    #[test]
264    fn registry_multiple_deploys() {
265        let mut reg = VersionRegistry::new();
266        let flows = vec!["F".to_string()];
267
268        reg.record_deploy(&flows, "v1 source", "f.axon", "anthropic");
269        reg.record_deploy(&flows, "v2 source", "f.axon", "anthropic");
270        reg.record_deploy(&flows, "v3 source", "f.axon", "anthropic");
271
272        assert_eq!(reg.flow_count(), 1);
273        assert_eq!(reg.total_versions(), 3);
274
275        let history = reg.get_history("F").unwrap();
276        assert_eq!(history.deploy_count, 3);
277        assert_eq!(history.active_version, 3);
278        assert_eq!(history.versions.len(), 3);
279
280        // Only v3 should be active
281        assert!(!history.versions[0].active);
282        assert!(!history.versions[1].active);
283        assert!(history.versions[2].active);
284    }
285
286    #[test]
287    fn registry_get_version() {
288        let mut reg = VersionRegistry::new();
289        let flows = vec!["F".to_string()];
290        reg.record_deploy(&flows, "src1", "f.axon", "anthropic");
291        reg.record_deploy(&flows, "src2", "f.axon", "anthropic");
292
293        let v1 = reg.get_version("F", 1).unwrap();
294        assert_eq!(v1.version, 1);
295        assert_eq!(v1.source, "src1");
296        assert!(!v1.active);
297
298        let v2 = reg.get_version("F", 2).unwrap();
299        assert_eq!(v2.version, 2);
300        assert!(v2.active);
301
302        assert!(reg.get_version("F", 99).is_none());
303        assert!(reg.get_version("NoSuch", 1).is_none());
304    }
305
306    #[test]
307    fn registry_get_active() {
308        let mut reg = VersionRegistry::new();
309        let flows = vec!["F".to_string()];
310        reg.record_deploy(&flows, "src1", "f.axon", "anthropic");
311        reg.record_deploy(&flows, "src2", "f.axon", "anthropic");
312
313        let active = reg.get_active("F").unwrap();
314        assert_eq!(active.version, 2);
315        assert!(active.active);
316    }
317
318    #[test]
319    fn registry_rollback() {
320        let mut reg = VersionRegistry::new();
321        let flows = vec!["F".to_string()];
322        reg.record_deploy(&flows, "source v1", "f.axon", "anthropic");
323        reg.record_deploy(&flows, "source v2", "f.axon", "anthropic");
324        reg.record_deploy(&flows, "source v3", "f.axon", "anthropic");
325
326        // Active is v3
327        assert_eq!(reg.get_active("F").unwrap().version, 3);
328
329        // Rollback to v1
330        let source = reg.rollback("F", 1).unwrap();
331        assert_eq!(source, "source v1");
332        assert_eq!(reg.get_active("F").unwrap().version, 1);
333
334        // v2 and v3 should be inactive
335        let h = reg.get_history("F").unwrap();
336        assert!(h.versions[0].active);  // v1
337        assert!(!h.versions[1].active); // v2
338        assert!(!h.versions[2].active); // v3
339    }
340
341    #[test]
342    fn registry_rollback_not_found() {
343        let mut reg = VersionRegistry::new();
344        let flows = vec!["F".to_string()];
345        reg.record_deploy(&flows, "src", "f.axon", "anthropic");
346
347        assert!(reg.rollback("F", 99).is_err());
348        assert!(reg.rollback("NoSuch", 1).is_err());
349    }
350
351    #[test]
352    fn registry_list_flows() {
353        let mut reg = VersionRegistry::new();
354        reg.record_deploy(&vec!["Alpha".to_string()], "a", "a.axon", "anthropic");
355        reg.record_deploy(&vec!["Beta".to_string()], "b", "b.axon", "anthropic");
356        reg.record_deploy(&vec!["Alpha".to_string()], "a2", "a.axon", "anthropic");
357
358        let list = reg.list_flows();
359        assert_eq!(list.len(), 2);
360        assert_eq!(list[0].flow_name, "Alpha");
361        assert_eq!(list[0].active_version, 2);
362        assert_eq!(list[0].total_versions, 2);
363        assert_eq!(list[1].flow_name, "Beta");
364        assert_eq!(list[1].active_version, 1);
365    }
366
367    #[test]
368    fn registry_multi_flow_deploy() {
369        let mut reg = VersionRegistry::new();
370        let flows = vec!["A".to_string(), "B".to_string()];
371        let results = reg.record_deploy(&flows, "multi", "m.axon", "anthropic");
372
373        assert_eq!(results.len(), 2);
374        assert_eq!(reg.flow_count(), 2);
375
376        // Both should be at version 1
377        assert_eq!(reg.get_active("A").unwrap().version, 1);
378        assert_eq!(reg.get_active("B").unwrap().version, 1);
379    }
380
381    #[test]
382    fn version_source_hash_stored() {
383        let mut reg = VersionRegistry::new();
384        let flows = vec!["F".to_string()];
385        reg.record_deploy(&flows, "persona P { tone: \"x\" }", "f.axon", "anthropic");
386
387        let v = reg.get_version("F", 1).unwrap();
388        assert!(!v.source_hash.is_empty());
389        assert_eq!(v.source_hash.len(), 12);
390    }
391
392    #[test]
393    fn flow_version_summary_serializes() {
394        let summary = FlowVersionSummary {
395            flow_name: "Test".into(),
396            active_version: 3,
397            total_versions: 5,
398            deploy_count: 5,
399            source_hash: "abc123def456".into(),
400        };
401        let json = serde_json::to_value(&summary).unwrap();
402        assert_eq!(json["flow_name"], "Test");
403        assert_eq!(json["active_version"], 3);
404    }
405}