Skip to main content

aida_core/
dispenser.rs

1// trace:ARCH-distributed-dispenser | ai:claude
2//! Sequence Dispenser — generates monotonically increasing sequence numbers
3//! for object ID creation.
4//!
5//! The dispenser is a purely local concern. Once a Node ID is assigned,
6//! sequence numbers require no network coordination. The dispenser interface
7//! is stable; the backing implementation can be swapped (file → SQLite → daemon)
8//! without touching callers.
9//!
10//! In centralized mode (node_id=0), the dispenser generates simple sequential
11//! IDs like `FR-001`. In distributed mode, it generates node-namespaced IDs
12//! like `FR-7-048`.
13
14use anyhow::Result;
15use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17
18/// The operating mode for ID generation.
19#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
20pub enum IdMode {
21    /// Centralized mode: IDs are `{TYPE}-{SEQ}` (e.g., `FR-001`).
22    /// Used when a central database is always available.
23    Centralized,
24    /// Distributed mode: IDs are `{TYPE}-{NODEID}-{SEQ}` (e.g., `FR-7-048`).
25    /// Used for offline-capable, multi-node deployments.
26    Distributed { node_id: u32 },
27}
28
29/// Current state of the dispenser — all counters for a given node.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct DispenserState {
32    /// The operating mode
33    pub mode: IdMode,
34    /// Current sequence values per type prefix.
35    /// The value is the last dispensed number; next will be value + 1.
36    pub sequences: HashMap<String, u32>,
37}
38
39impl DispenserState {
40    /// Create a new empty state for the given mode.
41    pub fn new(mode: IdMode) -> Self {
42        Self {
43            mode,
44            sequences: HashMap::new(),
45        }
46    }
47}
48
49/// The Dispenser trait — the stable interface for sequence number generation.
50///
51/// Implementations may be backed by:
52/// - A TOML/YAML file with a lockfile (Phase 1)
53/// - A SQLite table (Phase 2)
54/// - A Unix socket daemon (Phase 3)
55///
56/// All implementations must guarantee:
57/// 1. Monotonicity — `next()` never returns a value <= any previous return
58/// 2. Persistence — values survive process restarts
59/// 3. Thread safety — concurrent callers get distinct values
60pub trait Dispenser: Send + Sync {
61    /// Get the next sequence number for the given object type.
62    /// Increments the counter atomically.
63    fn next(&self, object_type: &str) -> Result<u32>;
64
65    /// Peek at the next sequence number without incrementing.
66    fn peek(&self, object_type: &str) -> Result<u32>;
67
68    /// Get the full current state (all counters).
69    fn state(&self) -> Result<DispenserState>;
70
71    /// Format an object ID according to the current mode.
72    ///
73    /// - Centralized: `FR-001`
74    /// - Distributed: `FR-7-001`
75    fn format_id(&self, object_type: &str, seq: u32) -> Result<String> {
76        let state = self.state()?;
77        let digits = 3; // minimum padding
78        match &state.mode {
79            IdMode::Centralized => {
80                Ok(format!("{}-{:0>width$}", object_type, seq, width = digits))
81            }
82            IdMode::Distributed { node_id } => {
83                Ok(format!(
84                    "{}-{}-{:0>width$}",
85                    object_type, node_id, seq,
86                    width = digits
87                ))
88            }
89        }
90    }
91
92    /// Dispense a new sequence number and return the formatted ID.
93    fn next_id(&self, object_type: &str) -> Result<String> {
94        let seq = self.next(object_type)?;
95        self.format_id(object_type, seq)
96    }
97}
98
99/// A simple in-memory dispenser for testing and single-session use.
100/// Not persistent — loses state on drop.
101pub struct MemoryDispenser {
102    state: std::sync::Mutex<DispenserState>,
103}
104
105impl MemoryDispenser {
106    /// Create a new in-memory dispenser.
107    pub fn new(mode: IdMode) -> Self {
108        Self {
109            state: std::sync::Mutex::new(DispenserState::new(mode)),
110        }
111    }
112
113    /// Create with pre-loaded state (e.g., from a file).
114    pub fn with_state(state: DispenserState) -> Self {
115        Self {
116            state: std::sync::Mutex::new(state),
117        }
118    }
119}
120
121impl Dispenser for MemoryDispenser {
122    fn next(&self, object_type: &str) -> Result<u32> {
123        let mut state = self.state.lock().unwrap();
124        let counter = state
125            .sequences
126            .entry(object_type.to_uppercase())
127            .or_insert(0);
128        *counter += 1;
129        Ok(*counter)
130    }
131
132    fn peek(&self, object_type: &str) -> Result<u32> {
133        let state = self.state.lock().unwrap();
134        let current = state
135            .sequences
136            .get(&object_type.to_uppercase())
137            .copied()
138            .unwrap_or(0);
139        Ok(current + 1)
140    }
141
142    fn state(&self) -> Result<DispenserState> {
143        let state = self.state.lock().unwrap();
144        Ok(state.clone())
145    }
146}
147
148/// File-backed dispenser using a TOML state file with advisory locking.
149/// This is the Phase 1 implementation per the distributed architecture spec.
150#[cfg(feature = "native")]
151pub struct FileDispenser {
152    path: std::path::PathBuf,
153    mode: IdMode,
154}
155
156#[cfg(feature = "native")]
157impl FileDispenser {
158    /// Create or open a file-backed dispenser.
159    /// The file will be created if it doesn't exist.
160    pub fn open(path: std::path::PathBuf, mode: IdMode) -> Result<Self> {
161        // Ensure parent directory exists
162        if let Some(parent) = path.parent() {
163            std::fs::create_dir_all(parent)?;
164        }
165        // Create file if it doesn't exist
166        if !path.exists() {
167            let state = DispenserState::new(mode.clone());
168            let content = toml::to_string_pretty(&state)?;
169            std::fs::write(&path, content)?;
170        }
171        Ok(Self { path, mode })
172    }
173
174    fn load_state(&self) -> Result<DispenserState> {
175        let content = std::fs::read_to_string(&self.path)?;
176        let state: DispenserState = toml::from_str(&content)?;
177        Ok(state)
178    }
179
180    fn save_state(&self, state: &DispenserState) -> Result<()> {
181        let content = toml::to_string_pretty(state)?;
182        std::fs::write(&self.path, content)?;
183        Ok(())
184    }
185}
186
187#[cfg(feature = "native")]
188impl Dispenser for FileDispenser {
189    fn next(&self, object_type: &str) -> Result<u32> {
190        use fs2::FileExt;
191        use std::fs::OpenOptions;
192
193        // Acquire advisory lock on the state file
194        let lock_file = OpenOptions::new()
195            .read(true)
196            .write(true)
197            .create(true)
198            .truncate(false)
199            .open(self.path.with_extension("lock"))?;
200        lock_file.lock_exclusive()?;
201
202        let result = (|| {
203            let mut state = self.load_state()?;
204            let counter = state
205                .sequences
206                .entry(object_type.to_uppercase())
207                .or_insert(0);
208            *counter += 1;
209            let value = *counter;
210            self.save_state(&state)?;
211            Ok(value)
212        })();
213
214        lock_file.unlock()?;
215        result
216    }
217
218    fn peek(&self, object_type: &str) -> Result<u32> {
219        let state = self.load_state()?;
220        let current = state
221            .sequences
222            .get(&object_type.to_uppercase())
223            .copied()
224            .unwrap_or(0);
225        Ok(current + 1)
226    }
227
228    fn state(&self) -> Result<DispenserState> {
229        let mut state = self.load_state()?;
230        state.mode = self.mode.clone();
231        Ok(state)
232    }
233}
234
235/// SQLite-backed dispenser using atomic UPSERT for sequence generation.
236/// This is the Phase 2 implementation per the distributed architecture spec.
237///
238/// Uses a single `sequences` table with one row per (node_id, type).
239/// SQLite's write serialization handles all concurrency — no external
240/// lockfile needed. Natural fit when the local read model is also SQLite.
241///
242/// Schema:
243/// ```sql
244/// CREATE TABLE IF NOT EXISTS dispenser_sequences (
245///     node_id INTEGER NOT NULL,
246///     type_prefix TEXT NOT NULL,
247///     next_val INTEGER NOT NULL DEFAULT 1,
248///     PRIMARY KEY (node_id, type_prefix)
249/// );
250/// CREATE TABLE IF NOT EXISTS dispenser_meta (
251///     key TEXT PRIMARY KEY,
252///     value TEXT NOT NULL
253/// );
254/// ```
255#[cfg(feature = "native")]
256pub struct SqliteDispenser {
257    conn: std::sync::Mutex<rusqlite::Connection>,
258    mode: IdMode,
259}
260
261#[cfg(feature = "native")]
262impl SqliteDispenser {
263    /// Open or create a SQLite-backed dispenser.
264    ///
265    /// If `db_path` points to an existing database (e.g., the local read model),
266    /// the dispenser tables are created alongside existing tables. If it doesn't
267    /// exist, a new database is created.
268    pub fn open(db_path: std::path::PathBuf, mode: IdMode) -> Result<Self> {
269        if let Some(parent) = db_path.parent() {
270            std::fs::create_dir_all(parent)?;
271        }
272
273        let conn = rusqlite::Connection::open(&db_path)?;
274        conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA busy_timeout=5000;")?;
275
276        // Create tables if they don't exist
277        conn.execute_batch(
278            "CREATE TABLE IF NOT EXISTS dispenser_sequences (
279                node_id INTEGER NOT NULL,
280                type_prefix TEXT NOT NULL,
281                next_val INTEGER NOT NULL DEFAULT 1,
282                PRIMARY KEY (node_id, type_prefix)
283            );
284            CREATE TABLE IF NOT EXISTS dispenser_meta (
285                key TEXT PRIMARY KEY,
286                value TEXT NOT NULL
287            );"
288        )?;
289
290        // Store the mode
291        let mode_json = serde_json::to_string(&mode)?;
292        conn.execute(
293            "INSERT OR REPLACE INTO dispenser_meta (key, value) VALUES ('mode', ?1)",
294            rusqlite::params![mode_json],
295        )?;
296
297        Ok(Self {
298            conn: std::sync::Mutex::new(conn),
299            mode,
300        })
301    }
302
303    /// Get the node_id for this dispenser (0 for centralized).
304    fn node_id(&self) -> u32 {
305        match &self.mode {
306            IdMode::Centralized => 0,
307            IdMode::Distributed { node_id } => *node_id,
308        }
309    }
310}
311
312#[cfg(feature = "native")]
313impl Dispenser for SqliteDispenser {
314    fn next(&self, object_type: &str) -> Result<u32> {
315        let conn = self.conn.lock().unwrap();
316        let node_id = self.node_id();
317        let type_upper = object_type.to_uppercase();
318
319        // Atomic increment via UPSERT + RETURNING
320        // SQLite 3.35+ supports RETURNING; fall back to two-step for older versions
321        let result: Result<u32> = conn
322            .query_row(
323                "INSERT INTO dispenser_sequences (node_id, type_prefix, next_val)
324                 VALUES (?1, ?2, 1)
325                 ON CONFLICT (node_id, type_prefix)
326                 DO UPDATE SET next_val = next_val + 1
327                 RETURNING next_val",
328                rusqlite::params![node_id as i64, type_upper],
329                |row| row.get(0),
330            )
331            .map_err(|e| anyhow::anyhow!("SQLite dispenser next() failed: {}", e));
332
333        result
334    }
335
336    fn peek(&self, object_type: &str) -> Result<u32> {
337        use rusqlite::OptionalExtension;
338
339        let conn = self.conn.lock().unwrap();
340        let node_id = self.node_id();
341        let type_upper = object_type.to_uppercase();
342
343        let current: Option<u32> = conn
344            .query_row(
345                "SELECT next_val FROM dispenser_sequences
346                 WHERE node_id = ?1 AND type_prefix = ?2",
347                rusqlite::params![node_id as i64, type_upper],
348                |row| row.get(0),
349            )
350            .optional()?;
351
352        // If no row exists, next value will be 1
353        Ok(current.map(|v| v + 1).unwrap_or(1))
354    }
355
356    fn state(&self) -> Result<DispenserState> {
357        let conn = self.conn.lock().unwrap();
358        let node_id = self.node_id();
359
360        let mut stmt = conn.prepare(
361            "SELECT type_prefix, next_val FROM dispenser_sequences WHERE node_id = ?1",
362        )?;
363
364        let mut sequences = HashMap::new();
365        let rows = stmt.query_map(rusqlite::params![node_id as i64], |row| {
366            Ok((row.get::<_, String>(0)?, row.get::<_, u32>(1)?))
367        })?;
368
369        for row in rows {
370            let (prefix, val) = row?;
371            sequences.insert(prefix, val);
372        }
373
374        Ok(DispenserState {
375            mode: self.mode.clone(),
376            sequences,
377        })
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384
385    #[test]
386    fn test_memory_dispenser_centralized() {
387        let d = MemoryDispenser::new(IdMode::Centralized);
388
389        assert_eq!(d.next("FR").unwrap(), 1);
390        assert_eq!(d.next("FR").unwrap(), 2);
391        assert_eq!(d.next("FR").unwrap(), 3);
392        assert_eq!(d.next("FEAT").unwrap(), 1);
393        assert_eq!(d.next("FR").unwrap(), 4);
394
395        assert_eq!(d.next_id("FR").unwrap(), "FR-005");
396        assert_eq!(d.next_id("FEAT").unwrap(), "FEAT-002");
397    }
398
399    #[test]
400    fn test_memory_dispenser_distributed() {
401        let d = MemoryDispenser::new(IdMode::Distributed { node_id: 7 });
402
403        assert_eq!(d.next_id("FR").unwrap(), "FR-7-001");
404        assert_eq!(d.next_id("FR").unwrap(), "FR-7-002");
405        assert_eq!(d.next_id("FEAT").unwrap(), "FEAT-7-001");
406    }
407
408    #[test]
409    fn test_peek_does_not_increment() {
410        let d = MemoryDispenser::new(IdMode::Centralized);
411
412        assert_eq!(d.peek("FR").unwrap(), 1);
413        assert_eq!(d.peek("FR").unwrap(), 1);
414        assert_eq!(d.next("FR").unwrap(), 1);
415        assert_eq!(d.peek("FR").unwrap(), 2);
416    }
417
418    #[test]
419    fn test_state_snapshot() {
420        let d = MemoryDispenser::new(IdMode::Distributed { node_id: 42 });
421        d.next("FR").unwrap();
422        d.next("FR").unwrap();
423        d.next("FEAT").unwrap();
424
425        let state = d.state().unwrap();
426        assert_eq!(state.mode, IdMode::Distributed { node_id: 42 });
427        assert_eq!(state.sequences.get("FR"), Some(&2));
428        assert_eq!(state.sequences.get("FEAT"), Some(&1));
429    }
430
431    #[test]
432    fn test_case_insensitive_type() {
433        let d = MemoryDispenser::new(IdMode::Centralized);
434        assert_eq!(d.next("fr").unwrap(), 1);
435        assert_eq!(d.next("FR").unwrap(), 2);
436        assert_eq!(d.next("Fr").unwrap(), 3);
437    }
438
439    #[cfg(feature = "native")]
440    #[test]
441    fn test_file_dispenser() {
442        let dir = tempfile::tempdir().unwrap();
443        let path = dir.path().join("dispenser.toml");
444
445        let d = FileDispenser::open(path.clone(), IdMode::Distributed { node_id: 7 }).unwrap();
446
447        assert_eq!(d.next("FR").unwrap(), 1);
448        assert_eq!(d.next("FR").unwrap(), 2);
449        assert_eq!(d.next_id("FR").unwrap(), "FR-7-003");
450
451        // Reopen — state should persist
452        let d2 = FileDispenser::open(path, IdMode::Distributed { node_id: 7 }).unwrap();
453        assert_eq!(d2.next("FR").unwrap(), 4);
454        assert_eq!(d2.peek("FEAT").unwrap(), 1);
455    }
456
457    #[cfg(feature = "native")]
458    #[test]
459    fn test_sqlite_dispenser_basic() {
460        let dir = tempfile::tempdir().unwrap();
461        let path = dir.path().join("dispenser.db");
462
463        let d = SqliteDispenser::open(path, IdMode::Distributed { node_id: 7 }).unwrap();
464
465        assert_eq!(d.next("FR").unwrap(), 1);
466        assert_eq!(d.next("FR").unwrap(), 2);
467        assert_eq!(d.next("FR").unwrap(), 3);
468        assert_eq!(d.next("FEAT").unwrap(), 1);
469        assert_eq!(d.next("FR").unwrap(), 4);
470
471        assert_eq!(d.next_id("FR").unwrap(), "FR-7-005");
472        assert_eq!(d.next_id("FEAT").unwrap(), "FEAT-7-002");
473    }
474
475    #[cfg(feature = "native")]
476    #[test]
477    fn test_sqlite_dispenser_persistence() {
478        let dir = tempfile::tempdir().unwrap();
479        let path = dir.path().join("dispenser.db");
480
481        // First session
482        {
483            let d = SqliteDispenser::open(path.clone(), IdMode::Distributed { node_id: 3 }).unwrap();
484            assert_eq!(d.next("FR").unwrap(), 1);
485            assert_eq!(d.next("FR").unwrap(), 2);
486            assert_eq!(d.next("BUG").unwrap(), 1);
487        }
488
489        // Second session — state persisted
490        {
491            let d = SqliteDispenser::open(path, IdMode::Distributed { node_id: 3 }).unwrap();
492            assert_eq!(d.next("FR").unwrap(), 3);
493            assert_eq!(d.peek("BUG").unwrap(), 2);
494            assert_eq!(d.next_id("BUG").unwrap(), "BUG-3-002");
495        }
496    }
497
498    #[cfg(feature = "native")]
499    #[test]
500    fn test_sqlite_dispenser_centralized() {
501        let dir = tempfile::tempdir().unwrap();
502        let path = dir.path().join("dispenser.db");
503
504        let d = SqliteDispenser::open(path, IdMode::Centralized).unwrap();
505
506        assert_eq!(d.next_id("FR").unwrap(), "FR-001");
507        assert_eq!(d.next_id("FR").unwrap(), "FR-002");
508        assert_eq!(d.next_id("BUG").unwrap(), "BUG-001");
509    }
510
511    #[cfg(feature = "native")]
512    #[test]
513    fn test_sqlite_dispenser_case_insensitive() {
514        let dir = tempfile::tempdir().unwrap();
515        let path = dir.path().join("dispenser.db");
516
517        let d = SqliteDispenser::open(path, IdMode::Centralized).unwrap();
518        assert_eq!(d.next("fr").unwrap(), 1);
519        assert_eq!(d.next("FR").unwrap(), 2);
520        assert_eq!(d.next("Fr").unwrap(), 3);
521    }
522
523    #[cfg(feature = "native")]
524    #[test]
525    fn test_sqlite_dispenser_state() {
526        let dir = tempfile::tempdir().unwrap();
527        let path = dir.path().join("dispenser.db");
528
529        let d = SqliteDispenser::open(path, IdMode::Distributed { node_id: 42 }).unwrap();
530        d.next("FR").unwrap();
531        d.next("FR").unwrap();
532        d.next("FEAT").unwrap();
533
534        let state = d.state().unwrap();
535        assert_eq!(state.mode, IdMode::Distributed { node_id: 42 });
536        assert_eq!(state.sequences.get("FR"), Some(&2));
537        assert_eq!(state.sequences.get("FEAT"), Some(&1));
538    }
539
540    #[cfg(feature = "native")]
541    #[test]
542    fn test_sqlite_dispenser_concurrent_threads() {
543        use std::sync::Arc;
544        use std::thread;
545
546        let dir = tempfile::tempdir().unwrap();
547        let path = dir.path().join("dispenser.db");
548
549        let d = Arc::new(SqliteDispenser::open(path, IdMode::Distributed { node_id: 1 }).unwrap());
550
551        let mut handles = vec![];
552        for _ in 0..10 {
553            let d = Arc::clone(&d);
554            handles.push(thread::spawn(move || {
555                d.next("FR").unwrap()
556            }));
557        }
558
559        let mut results: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
560        results.sort();
561
562        // All 10 values should be unique and sequential 1-10
563        assert_eq!(results, (1..=10).collect::<Vec<u32>>());
564    }
565
566    #[cfg(feature = "native")]
567    #[test]
568    fn test_sqlite_dispenser_coexists_with_other_tables() {
569        let dir = tempfile::tempdir().unwrap();
570        let path = dir.path().join("shared.db");
571
572        // Create a database with some other table first
573        {
574            let conn = rusqlite::Connection::open(&path).unwrap();
575            conn.execute("CREATE TABLE other_data (id INTEGER PRIMARY KEY, value TEXT)", []).unwrap();
576            conn.execute("INSERT INTO other_data (value) VALUES ('hello')", []).unwrap();
577        }
578
579        // Open dispenser on the same database — should not interfere
580        let d = SqliteDispenser::open(path.clone(), IdMode::Centralized).unwrap();
581        assert_eq!(d.next("FR").unwrap(), 1);
582
583        // Verify the other table is intact
584        {
585            let conn = rusqlite::Connection::open(&path).unwrap();
586            let val: String = conn.query_row("SELECT value FROM other_data", [], |r| r.get(0)).unwrap();
587            assert_eq!(val, "hello");
588        }
589    }
590}