1use crate::{Result, SUBSCRIPTION_REL_SUB_DB};
2use borderless::common::{Id, Introduction};
3use borderless::events::Topic;
4use borderless::{AgentId, Context};
5use borderless_kv_store::{Db, RawWrite, RoCursor, RoTx};
6use std::str::FromStr;
7
8fn generate_key(publisher: Id, topic: String, subscriber: Option<AgentId>) -> String {
14 let publisher = match publisher {
16 Id::Contract { contract_id } => contract_id.to_string().to_ascii_lowercase(),
17 Id::Agent { agent_id } => agent_id.to_string().to_ascii_lowercase(),
18 };
19 let subscriber = subscriber
21 .map(|agent| agent.to_string().to_ascii_lowercase())
22 .unwrap_or_default();
23 let topic = topic.trim_matches('/').to_ascii_lowercase();
25
26 if topic.is_empty() && subscriber.is_empty() {
29 format!("{publisher}\n")
30 } else {
31 format!("{publisher}\n{topic}\n{subscriber}")
33 }
34}
35
36fn extract_key(key: &[u8]) -> Result<(String, AgentId)> {
40 let key = std::str::from_utf8(key).with_context(|| "DB key deserialization failed")?;
41
42 let mut parts = key.splitn(3, '\n');
43 match (parts.next(), parts.next(), parts.next()) {
44 (Some(publisher), Some(topic), Some(s)) => {
45 let subscriber =
46 AgentId::from_str(s).with_context(|| "AgentId deserialization error")?;
47 let full_topic = format!("/{publisher}/{topic}");
48 Ok((full_topic, subscriber))
49 }
50 _ => Err(crate::Error::msg(
51 "SubscriptionHandler: malformed key error",
52 )),
53 }
54}
55
56pub struct SubscriptionHandler<'a, S: Db> {
57 db: &'a S,
58}
59
60impl<'a, S: Db> SubscriptionHandler<'a, S> {
61 pub fn new(db: &'a S) -> Self {
62 Self { db }
63 }
64
65 pub fn init(&self, txn: &mut <S as Db>::RwTx<'_>, introduction: Introduction) -> Result<()> {
67 match introduction.id {
69 Id::Contract { .. } => {} Id::Agent { agent_id } => {
71 for s in introduction.subscriptions {
72 self.subscribe(txn, agent_id, s)?
73 }
74 }
75 }
76 Ok(())
77 }
78
79 pub fn subscribe(
81 &self,
82 txn: &mut <S as Db>::RwTx<'_>,
83 subscriber: AgentId,
84 topic: Topic,
85 ) -> Result<()> {
86 let db_ptr = self.db.open_sub_db(SUBSCRIPTION_REL_SUB_DB)?;
87 let key = generate_key(topic.publisher, topic.topic, Some(subscriber));
88 txn.write(&db_ptr, &key, &topic.method)?;
89 Ok(())
90 }
91
92 pub fn unsubscribe(
94 &self,
95 txn: &mut <S as Db>::RwTx<'_>,
96 subscriber: AgentId,
97 publisher: Id,
98 topic: String,
99 ) -> Result<bool> {
100 let db_ptr = self.db.open_sub_db(SUBSCRIPTION_REL_SUB_DB)?;
101 let key = generate_key(publisher, topic, Some(subscriber));
102 let deleted = txn.delete(&db_ptr, &key).is_ok();
103 Ok(deleted)
104 }
105
106 pub fn get_topic_subscribers(
108 &self,
109 publisher: Id,
110 topic: String,
111 ) -> Result<Vec<(AgentId, String)>> {
112 let db_ptr = self.db.open_sub_db(SUBSCRIPTION_REL_SUB_DB)?;
114 let txn = self.db.begin_ro_txn()?;
115 let mut cursor = txn.ro_cursor(&db_ptr)?;
116
117 let mut subscribers = Vec::new();
118
119 let prefix = generate_key(publisher, topic, None);
121
122 for (key, value) in cursor.iter_from(&prefix) {
123 if !key.starts_with(prefix.as_bytes()) {
125 break;
126 }
127 let topic =
129 String::from_utf8(value.to_vec()).with_context(|| "Failed to deserialize topic")?;
130 let (_, subscriber) = extract_key(key)?;
132 subscribers.push((subscriber, topic));
133 }
134 drop(cursor);
136 Ok(subscribers)
137 }
138
139 pub fn get_subscribers(&self, publisher: Id) -> Result<Vec<(AgentId, String)>> {
141 self.get_topic_subscribers(publisher, String::default())
142 }
143
144 pub fn get_subscriptions(&self, target: AgentId) -> Result<Vec<String>> {
146 let db_ptr = self.db.open_sub_db(SUBSCRIPTION_REL_SUB_DB)?;
148 let txn = self.db.begin_ro_txn()?;
149 let mut cursor = txn.ro_cursor(&db_ptr)?;
150
151 let mut topics = Vec::new();
152
153 for (key, _) in cursor.iter() {
155 let (full_topic, subscriber) = extract_key(key)?;
156 if target != subscriber {
158 continue;
159 }
160 topics.push(full_topic);
162 }
163 drop(cursor);
165 Ok(topics)
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use crate::db::subscriptions::SubscriptionHandler;
172 use crate::SUBSCRIPTION_REL_SUB_DB;
173 use borderless::common::Id;
174 use borderless::events::Topic;
175 use borderless::{AgentId, ContractId, Result};
176 use borderless_kv_store::backend::lmdb::Lmdb;
177 use borderless_kv_store::{Db, Tx};
178 use tempfile::tempdir;
179
180 const N: usize = 10;
181
182 fn open_tmp_lmdb() -> Lmdb {
183 let tmp_dir = tempdir().unwrap();
184 let env = Lmdb::new(tmp_dir.path(), 1).unwrap();
185 env.create_sub_db(SUBSCRIPTION_REL_SUB_DB).unwrap();
186 env
187 }
188
189 #[test]
190 fn subscription() -> Result<()> {
191 let lmdb = open_tmp_lmdb();
193 let handler = SubscriptionHandler::new(&lmdb);
194 let mut txn = lmdb.begin_rw_txn()?;
195
196 let subscribers: Vec<AgentId> = std::iter::repeat_with(|| AgentId::generate())
198 .take(N)
199 .collect();
200 let publishers: Vec<Id> = std::iter::repeat_with(|| Id::agent(AgentId::generate()))
201 .take(N)
202 .collect();
203 let topic = "MyTopic";
204
205 for i in 0..N {
207 let s = subscribers[i];
208 let p = publishers[i];
209 let topic = Topic::new(p, topic.to_string(), "method".to_string());
210 handler.subscribe(&mut txn, s, topic.clone())?;
211 }
212
213 txn.commit()?;
215
216 for i in 0..N {
218 let s = subscribers[i];
219 let p = publishers[i].to_string();
220
221 let subscriptions = handler.get_subscriptions(s)?;
222 assert_eq!(subscriptions.len(), 1);
223 let full_topic = format!("/{}/{}", p, topic.to_ascii_lowercase());
224 assert_eq!(subscriptions[0], full_topic);
225 }
226 Ok(())
227 }
228
229 #[test]
230 fn unsubscription() -> Result<()> {
231 let lmdb = open_tmp_lmdb();
233 let handler = SubscriptionHandler::new(&lmdb);
234
235 let subscribers: Vec<AgentId> = std::iter::repeat_with(|| AgentId::generate())
237 .take(N)
238 .collect();
239 let publishers: Vec<Id> = std::iter::repeat_with(|| Id::agent(AgentId::generate()))
240 .take(N)
241 .collect();
242 let topic = "MyTopic";
243
244 let mut txn = lmdb.begin_rw_txn()?;
246 for i in 0..N {
247 let topic = Topic::new(publishers[i], topic.to_string(), "method".to_string());
248 handler.subscribe(&mut txn, subscribers[i], topic)?;
250 }
251 txn.commit()?;
252
253 let mut txn = lmdb.begin_rw_txn()?;
255 for i in 0..N {
256 let s = subscribers[i];
257 let p = publishers[i];
258 assert!(handler.unsubscribe(&mut txn, s, p, topic.to_string())?);
260 }
261 Ok(())
262 }
263
264 #[test]
265 fn fetch_topic_subscribers() -> Result<()> {
266 let lmdb = open_tmp_lmdb();
268 let handler = SubscriptionHandler::new(&lmdb);
269
270 let mut subscribers: Vec<AgentId> = std::iter::repeat_with(|| AgentId::generate())
272 .take(N)
273 .collect();
274 let publisher = Id::contract(ContractId::generate());
275 let topic = "tennis";
276
277 let mut txn = lmdb.begin_rw_txn()?;
279 for i in 0..N {
280 let topic = Topic::new(publisher, topic.to_string(), "method".to_string());
281 handler.subscribe(&mut txn, subscribers[i], topic)?;
283 }
284 txn.commit()?;
285
286 let mut output: Vec<AgentId> = handler
288 .get_topic_subscribers(publisher, topic.to_string())?
289 .iter()
290 .map(|(aid, _)| aid)
291 .cloned()
292 .collect();
293 subscribers.sort();
295 output.sort();
296 assert_eq!(subscribers, output, "Mismatch in topic subscribers");
297 Ok(())
298 }
299
300 #[test]
301 fn fetch_subscribers() -> Result<()> {
302 let lmdb = open_tmp_lmdb();
304 let handler = SubscriptionHandler::new(&lmdb);
305
306 let mut subscribers: Vec<AgentId> = std::iter::repeat_with(|| AgentId::generate())
308 .take(N)
309 .collect();
310 let publisher = Id::contract(ContractId::generate());
311 let topics = vec!["Soccer", "Tennis", "Golf", "Basketball", "Football"];
312
313 let mut txn = lmdb.begin_rw_txn()?;
315 for i in 0..N {
316 let topic = Topic::new(publisher, topics[i % 5].to_string(), "method".to_string());
317 handler.subscribe(&mut txn, subscribers[i], topic)?;
319 }
320 txn.commit()?;
321
322 let mut output: Vec<AgentId> = handler
324 .get_subscribers(publisher)?
325 .iter()
326 .map(|(aid, _)| aid)
327 .cloned()
328 .collect();
329 subscribers.sort();
331 output.sort();
332 assert_eq!(subscribers, output, "Mismatch in subscribers");
333 Ok(())
334 }
335
336 #[test]
337 fn fetch_subscriptions() -> Result<()> {
338 let lmdb = open_tmp_lmdb();
340 let handler = SubscriptionHandler::new(&lmdb);
341
342 let subscriber = AgentId::generate();
344 let topics = vec!["Soccer", "Tennis", "Golf", "Basketball", "Football"];
345
346 let mut full_topic: Vec<String> = Vec::new();
347 let mut txn = lmdb.begin_rw_txn()?;
349 for i in 0..N {
350 let p = AgentId::generate();
351 let topic = topics[i % 5].to_string();
352 full_topic.push(format!("/{}/{}", p, topic.to_ascii_lowercase()));
353 let topic = Topic::new(Id::agent(p), topic, "method".to_string());
355 handler.subscribe(&mut txn, subscriber, topic)?;
356 }
357 txn.commit()?;
358
359 let mut output = handler.get_subscriptions(subscriber)?;
361 output.sort();
362 full_topic.sort();
363 assert_eq!(full_topic, output, "Mismatch in subscriptions");
364 Ok(())
365 }
366}