haystack_core/graph/
subscriber.rs1use tokio::sync::broadcast;
4
5use super::changelog::{ChangelogGap, GraphDiff};
6use super::shared::SharedGraph;
7
8pub struct GraphSubscriber {
23 graph: SharedGraph,
24 rx: broadcast::Receiver<u64>,
25 last_version: u64,
26}
27
28impl GraphSubscriber {
29 pub fn new(graph: SharedGraph) -> Self {
31 let (rx, last_version) = graph.subscribe_with_version();
32 Self {
33 graph,
34 rx,
35 last_version,
36 }
37 }
38
39 pub fn from_version(graph: SharedGraph, version: u64) -> Self {
43 let rx = graph.subscribe();
44 Self {
45 graph,
46 rx,
47 last_version: version,
48 }
49 }
50
51 pub async fn next_batch(&mut self) -> Result<Vec<GraphDiff>, ChangelogGap> {
57 let mut _latest = match self.rx.recv().await {
60 Ok(v) => v,
61 Err(broadcast::error::RecvError::Lagged(_)) => {
62 self.graph.version()
64 }
65 Err(broadcast::error::RecvError::Closed) => {
66 return Ok(Vec::new());
68 }
69 };
70
71 while let Ok(v) = self.rx.try_recv() {
73 _latest = v;
74 }
75
76 let diffs = self.graph.changes_since(self.last_version)?;
77 if let Some(last) = diffs.last() {
78 self.last_version = last.version;
79 }
80 Ok(diffs)
81 }
82
83 pub fn version(&self) -> u64 {
85 self.last_version
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92 use crate::data::HDict;
93 use crate::graph::EntityGraph;
94 use crate::kinds::{HRef, Kind};
95
96 fn make_site(id: &str) -> HDict {
97 let mut d = HDict::new();
98 d.set("id", Kind::Ref(HRef::from_val(id)));
99 d.set("site", Kind::Marker);
100 d
101 }
102
103 #[tokio::test]
104 async fn subscriber_receives_diffs() {
105 let sg = SharedGraph::new(EntityGraph::new());
106 let mut sub = GraphSubscriber::new(sg.clone());
107 assert_eq!(sub.version(), 0);
108
109 sg.add(make_site("site-1")).unwrap();
110
111 let diffs = sub.next_batch().await.unwrap();
112 assert_eq!(diffs.len(), 1);
113 assert_eq!(diffs[0].ref_val, "site-1");
114 assert_eq!(sub.version(), 1);
115 }
116
117 #[tokio::test]
118 async fn subscriber_coalesces_batches() {
119 let sg = SharedGraph::new(EntityGraph::new());
120 let mut sub = GraphSubscriber::new(sg.clone());
121
122 sg.add(make_site("site-1")).unwrap();
124 sg.add(make_site("site-2")).unwrap();
125 sg.add(make_site("site-3")).unwrap();
126
127 tokio::task::yield_now().await;
129
130 let diffs = sub.next_batch().await.unwrap();
131 assert_eq!(diffs.len(), 3);
132 assert_eq!(sub.version(), 3);
133 }
134
135 #[tokio::test]
136 async fn subscriber_from_version() {
137 let sg = SharedGraph::new(EntityGraph::new());
138 sg.add(make_site("site-1")).unwrap();
139 sg.add(make_site("site-2")).unwrap();
140
141 let mut sub = GraphSubscriber::from_version(sg.clone(), 1);
143
144 sg.add(make_site("site-3")).unwrap();
145
146 let diffs = sub.next_batch().await.unwrap();
147 assert_eq!(diffs.len(), 2); assert_eq!(sub.version(), 3);
149 }
150}