Skip to main content

agent_store/
doorbell.rs

1//! The commit doorbell — the seam for local multi-agent coordination.
2//!
3//! When an agent commits a turn, co-located agents should wake and read it
4//! instead of polling the file. The substrate does **not** know about the
5//! mesh: it only emits a [`CommitEvent`]. A consumer (newt's session loop,
6//! modulex's MCP server) subscribes and bridges the event onto agent-mesh —
7//! publishing the causal pointer `(writer, seq)` on a per-stream topic. A peer
8//! that misses the doorbell still catches up by reading entries past its
9//! last-seen seq on its next load.
10//!
11//! The payload is a **causal pointer, never a timestamp** — it composes with
12//! the mesh's per-peer sequence tracking and honors "wall-clock is a claim,
13//! never a coordination primitive."
14
15use std::sync::{Arc, Mutex};
16
17/// What a commit announces: where it landed and its content hash. Just enough
18/// for a peer to fetch the entry — never the payload itself.
19#[derive(Clone, Debug, PartialEq, Eq)]
20pub struct CommitEvent {
21    pub stream: String,
22    pub writer: String,
23    pub seq: u64,
24    pub content_hash: [u8; 32],
25}
26
27type Subscriber = Box<dyn Fn(&CommitEvent) + Send + Sync>;
28
29/// A fan-out of commit subscribers. Cloneable and shareable; clones observe
30/// the same subscriber set.
31#[derive(Clone, Default)]
32pub struct Doorbell {
33    subscribers: Arc<Mutex<Vec<Subscriber>>>,
34}
35
36impl Doorbell {
37    pub fn new() -> Self {
38        Self::default()
39    }
40
41    /// Register a callback invoked for every subsequent [`Doorbell::ring`].
42    pub fn subscribe<F>(&self, f: F)
43    where
44        F: Fn(&CommitEvent) + Send + Sync + 'static,
45    {
46        self.subscribers
47            .lock()
48            .expect("doorbell mutex poisoned")
49            .push(Box::new(f));
50    }
51
52    /// Notify every subscriber of a commit. Consumers call this immediately
53    /// after a successful [`crate::WriterLog::append`].
54    pub fn ring(&self, event: &CommitEvent) {
55        for sub in self
56            .subscribers
57            .lock()
58            .expect("doorbell mutex poisoned")
59            .iter()
60        {
61            sub(event);
62        }
63    }
64}
65
66#[cfg(test)]
67mod tests {
68    use super::*;
69
70    #[test]
71    fn delivers_events_to_subscribers() {
72        let bell = Doorbell::new();
73        let seen: Arc<Mutex<Vec<u64>>> = Arc::new(Mutex::new(Vec::new()));
74        let seen_clone = Arc::clone(&seen);
75        bell.subscribe(move |e| seen_clone.lock().unwrap().push(e.seq));
76
77        bell.ring(&CommitEvent {
78            stream: "conv:x".into(),
79            writer: "alice".into(),
80            seq: 1,
81            content_hash: [0u8; 32],
82        });
83        bell.ring(&CommitEvent {
84            stream: "conv:x".into(),
85            writer: "alice".into(),
86            seq: 2,
87            content_hash: [0u8; 32],
88        });
89
90        assert_eq!(*seen.lock().unwrap(), vec![1, 2]);
91    }
92
93    #[test]
94    fn multiple_subscribers_all_fire() {
95        let bell = Doorbell::new();
96        let count = Arc::new(Mutex::new(0u32));
97        for _ in 0..3 {
98            let c = Arc::clone(&count);
99            bell.subscribe(move |_| *c.lock().unwrap() += 1);
100        }
101        bell.ring(&CommitEvent {
102            stream: "s".into(),
103            writer: "w".into(),
104            seq: 1,
105            content_hash: [0u8; 32],
106        });
107        assert_eq!(*count.lock().unwrap(), 3);
108    }
109}