haystack_core/graph/
subscriber.rs1use tokio::sync::broadcast;
4
5use super::changelog::{ChangelogGap, GraphDiff};
6use super::shared::SharedGraph;
7
8pub struct GraphSubscriber {
23 graph: SharedGraph,
24 rx: broadcast::Receiver<u64>,
25 last_version: u64,
26}
27
28impl GraphSubscriber {
29 pub fn new(graph: SharedGraph) -> Self {
31 let rx = graph.subscribe();
32 let last_version = graph.version();
33 Self {
34 graph,
35 rx,
36 last_version,
37 }
38 }
39
40 pub fn from_version(graph: SharedGraph, version: u64) -> Self {
44 let rx = graph.subscribe();
45 Self {
46 graph,
47 rx,
48 last_version: version,
49 }
50 }
51
52 pub async fn next_batch(&mut self) -> Result<Vec<GraphDiff>, ChangelogGap> {
58 let mut _latest = match self.rx.recv().await {
61 Ok(v) => v,
62 Err(broadcast::error::RecvError::Lagged(_)) => {
63 self.graph.version()
65 }
66 Err(broadcast::error::RecvError::Closed) => {
67 return Ok(Vec::new());
69 }
70 };
71
72 while let Ok(v) = self.rx.try_recv() {
74 _latest = v;
75 }
76
77 let diffs = self.graph.changes_since(self.last_version)?;
78 if let Some(last) = diffs.last() {
79 self.last_version = last.version;
80 }
81 Ok(diffs)
82 }
83
84 pub fn version(&self) -> u64 {
86 self.last_version
87 }
88}
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93 use crate::data::HDict;
94 use crate::graph::EntityGraph;
95 use crate::kinds::{HRef, Kind};
96
97 fn make_site(id: &str) -> HDict {
98 let mut d = HDict::new();
99 d.set("id", Kind::Ref(HRef::from_val(id)));
100 d.set("site", Kind::Marker);
101 d
102 }
103
104 #[tokio::test]
105 async fn subscriber_receives_diffs() {
106 let sg = SharedGraph::new(EntityGraph::new());
107 let mut sub = GraphSubscriber::new(sg.clone());
108 assert_eq!(sub.version(), 0);
109
110 sg.add(make_site("site-1")).unwrap();
111
112 let diffs = sub.next_batch().await.unwrap();
113 assert_eq!(diffs.len(), 1);
114 assert_eq!(diffs[0].ref_val, "site-1");
115 assert_eq!(sub.version(), 1);
116 }
117
118 #[tokio::test]
119 async fn subscriber_coalesces_batches() {
120 let sg = SharedGraph::new(EntityGraph::new());
121 let mut sub = GraphSubscriber::new(sg.clone());
122
123 sg.add(make_site("site-1")).unwrap();
125 sg.add(make_site("site-2")).unwrap();
126 sg.add(make_site("site-3")).unwrap();
127
128 tokio::task::yield_now().await;
130
131 let diffs = sub.next_batch().await.unwrap();
132 assert_eq!(diffs.len(), 3);
133 assert_eq!(sub.version(), 3);
134 }
135
136 #[tokio::test]
137 async fn subscriber_from_version() {
138 let sg = SharedGraph::new(EntityGraph::new());
139 sg.add(make_site("site-1")).unwrap();
140 sg.add(make_site("site-2")).unwrap();
141
142 let mut sub = GraphSubscriber::from_version(sg.clone(), 1);
144
145 sg.add(make_site("site-3")).unwrap();
146
147 let diffs = sub.next_batch().await.unwrap();
148 assert_eq!(diffs.len(), 2); assert_eq!(sub.version(), 3);
150 }
151}