borderless_runtime/db/
subscriptions.rs

1use crate::{Result, SUBSCRIPTION_REL_SUB_DB};
2use borderless::common::{Id, Introduction};
3use borderless::events::Topic;
4use borderless::{AgentId, Context};
5use borderless_kv_store::{Db, RawWrite, RoCursor, RoTx};
6use std::str::FromStr;
7
8/// Generates a DB key from a publisher, subscriber and topic
9///
10/// Current DB relationship is: publisher | topic | subscriber => method_name
11///
12/// For generating a subscribers look-up key, leave the subscriber field as None
13fn generate_key(publisher: Id, topic: String, subscriber: Option<AgentId>) -> String {
14    // Publishers can be either Contracts or Agents
15    let publisher = match publisher {
16        Id::Contract { contract_id } => contract_id.to_string().to_ascii_lowercase(),
17        Id::Agent { agent_id } => agent_id.to_string().to_ascii_lowercase(),
18    };
19    // Subscribers are only Agents
20    let subscriber = subscriber
21        .map(|agent| agent.to_string().to_ascii_lowercase())
22        .unwrap_or_default();
23    // Remove leading and trailing slashes
24    let topic = topic.trim_matches('/').to_ascii_lowercase();
25
26    // NOTE: when building a look-up key without a topic, do not write
27    // additional delimiters as they interfere with our cursor logic
28    if topic.is_empty() && subscriber.is_empty() {
29        format!("{publisher}\n")
30    } else {
31        // TODO Forbid creating topics containing the newline character
32        format!("{publisher}\n{topic}\n{subscriber}")
33    }
34}
35
36/// Extracts the full topic (publisher + topic) and subscriber from a DB key
37///
38/// Returns a tuple, or an error if the deserialization fails
39fn extract_key(key: &[u8]) -> Result<(String, AgentId)> {
40    let key = std::str::from_utf8(key).with_context(|| "DB key deserialization failed")?;
41
42    let mut parts = key.splitn(3, '\n');
43    match (parts.next(), parts.next(), parts.next()) {
44        (Some(publisher), Some(topic), Some(s)) => {
45            let subscriber =
46                AgentId::from_str(s).with_context(|| "AgentId deserialization error")?;
47            let full_topic = format!("/{publisher}/{topic}");
48            Ok((full_topic, subscriber))
49        }
50        _ => Err(crate::Error::msg(
51            "SubscriptionHandler: malformed key error",
52        )),
53    }
54}
55
56pub struct SubscriptionHandler<'a, S: Db> {
57    db: &'a S,
58}
59
60impl<'a, S: Db> SubscriptionHandler<'a, S> {
61    pub fn new(db: &'a S) -> Self {
62        Self { db }
63    }
64
65    /// Loads the subscriptions from a software agent introduction
66    pub fn init(&self, txn: &mut <S as Db>::RwTx<'_>, introduction: Introduction) -> Result<()> {
67        // Write static subscriptions
68        match introduction.id {
69            Id::Contract { .. } => {} // Not applicable
70            Id::Agent { agent_id } => {
71                for s in introduction.subscriptions {
72                    self.subscribe(txn, agent_id, s)?
73                }
74            }
75        }
76        Ok(())
77    }
78
79    /// Subscribes an ['AgentId'] to a topic from a specific publisher
80    pub fn subscribe(
81        &self,
82        txn: &mut <S as Db>::RwTx<'_>,
83        subscriber: AgentId,
84        topic: Topic,
85    ) -> Result<()> {
86        let db_ptr = self.db.open_sub_db(SUBSCRIPTION_REL_SUB_DB)?;
87        let key = generate_key(topic.publisher, topic.topic, Some(subscriber));
88        txn.write(&db_ptr, &key, &topic.method)?;
89        Ok(())
90    }
91
92    /// Unsubscribes an ['AgentId'] from a topic
93    pub fn unsubscribe(
94        &self,
95        txn: &mut <S as Db>::RwTx<'_>,
96        subscriber: AgentId,
97        publisher: Id,
98        topic: String,
99    ) -> Result<bool> {
100        let db_ptr = self.db.open_sub_db(SUBSCRIPTION_REL_SUB_DB)?;
101        let key = generate_key(publisher, topic, Some(subscriber));
102        let deleted = txn.delete(&db_ptr, &key).is_ok();
103        Ok(deleted)
104    }
105
106    /// Fetches the active subscribers for a full topic (publisher + topic)
107    pub fn get_topic_subscribers(
108        &self,
109        publisher: Id,
110        topic: String,
111    ) -> Result<Vec<(AgentId, String)>> {
112        // Setup DB cursor
113        let db_ptr = self.db.open_sub_db(SUBSCRIPTION_REL_SUB_DB)?;
114        let txn = self.db.begin_ro_txn()?;
115        let mut cursor = txn.ro_cursor(&db_ptr)?;
116
117        let mut subscribers = Vec::new();
118
119        // Use an efficient look-up key
120        let prefix = generate_key(publisher, topic, None);
121
122        for (key, value) in cursor.iter_from(&prefix) {
123            // Stop iterating when prefix no longer matches
124            if !key.starts_with(prefix.as_bytes()) {
125                break;
126            }
127            // Decode method_name
128            let topic =
129                String::from_utf8(value.to_vec()).with_context(|| "Failed to deserialize topic")?;
130            // Push subscriber to vector
131            let (_, subscriber) = extract_key(key)?;
132            subscribers.push((subscriber, topic));
133        }
134        // Free up resources
135        drop(cursor);
136        Ok(subscribers)
137    }
138
139    /// Fetches all active subscribers to topics from the given publisher
140    pub fn get_subscribers(&self, publisher: Id) -> Result<Vec<(AgentId, String)>> {
141        self.get_topic_subscribers(publisher, String::default())
142    }
143
144    /// Fetches all active subscriptions for the specified ['AgentId']
145    pub fn get_subscriptions(&self, target: AgentId) -> Result<Vec<String>> {
146        // Setup DB cursor
147        let db_ptr = self.db.open_sub_db(SUBSCRIPTION_REL_SUB_DB)?;
148        let txn = self.db.begin_ro_txn()?;
149        let mut cursor = txn.ro_cursor(&db_ptr)?;
150
151        let mut topics = Vec::new();
152
153        // TODO Avoid iterating all the keys?
154        for (key, _) in cursor.iter() {
155            let (full_topic, subscriber) = extract_key(key)?;
156            // Ignore subscription not related with target
157            if target != subscriber {
158                continue;
159            }
160            // Push the full topic
161            topics.push(full_topic);
162        }
163        // Free up resources
164        drop(cursor);
165        Ok(topics)
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use crate::db::subscriptions::SubscriptionHandler;
172    use crate::SUBSCRIPTION_REL_SUB_DB;
173    use borderless::common::Id;
174    use borderless::events::Topic;
175    use borderless::{AgentId, ContractId, Result};
176    use borderless_kv_store::backend::lmdb::Lmdb;
177    use borderless_kv_store::{Db, Tx};
178    use tempfile::tempdir;
179
180    const N: usize = 10;
181
182    fn open_tmp_lmdb() -> Lmdb {
183        let tmp_dir = tempdir().unwrap();
184        let env = Lmdb::new(tmp_dir.path(), 1).unwrap();
185        env.create_sub_db(SUBSCRIPTION_REL_SUB_DB).unwrap();
186        env
187    }
188
189    #[test]
190    fn subscription() -> Result<()> {
191        // Setup dummy DB
192        let lmdb = open_tmp_lmdb();
193        let handler = SubscriptionHandler::new(&lmdb);
194        let mut txn = lmdb.begin_rw_txn()?;
195
196        // Setup: subscribers are sw-agents and publishers are smart-contracts
197        let subscribers: Vec<AgentId> = std::iter::repeat_with(|| AgentId::generate())
198            .take(N)
199            .collect();
200        let publishers: Vec<Id> = std::iter::repeat_with(|| Id::agent(AgentId::generate()))
201            .take(N)
202            .collect();
203        let topic = "MyTopic";
204
205        // Generate subscriptions
206        for i in 0..N {
207            let s = subscribers[i];
208            let p = publishers[i];
209            let topic = Topic::new(p, topic.to_string(), "method".to_string());
210            handler.subscribe(&mut txn, s, topic.clone())?;
211        }
212
213        // Commit changes
214        txn.commit()?;
215
216        // Check subscriptions are present
217        for i in 0..N {
218            let s = subscribers[i];
219            let p = publishers[i].to_string();
220
221            let subscriptions = handler.get_subscriptions(s)?;
222            assert_eq!(subscriptions.len(), 1);
223            let full_topic = format!("/{}/{}", p, topic.to_ascii_lowercase());
224            assert_eq!(subscriptions[0], full_topic);
225        }
226        Ok(())
227    }
228
229    #[test]
230    fn unsubscription() -> Result<()> {
231        // Setup dummy DB
232        let lmdb = open_tmp_lmdb();
233        let handler = SubscriptionHandler::new(&lmdb);
234
235        // Setup: both subscribers and publishers are sw-agents
236        let subscribers: Vec<AgentId> = std::iter::repeat_with(|| AgentId::generate())
237            .take(N)
238            .collect();
239        let publishers: Vec<Id> = std::iter::repeat_with(|| Id::agent(AgentId::generate()))
240            .take(N)
241            .collect();
242        let topic = "MyTopic";
243
244        // Generate subscriptions
245        let mut txn = lmdb.begin_rw_txn()?;
246        for i in 0..N {
247            let topic = Topic::new(publishers[i], topic.to_string(), "method".to_string());
248            // Subscribe to topic
249            handler.subscribe(&mut txn, subscribers[i], topic)?;
250        }
251        txn.commit()?;
252
253        // Check that unsubscriptions are successful
254        let mut txn = lmdb.begin_rw_txn()?;
255        for i in 0..N {
256            let s = subscribers[i];
257            let p = publishers[i];
258            // Unsubscribe and check result is true
259            assert!(handler.unsubscribe(&mut txn, s, p, topic.to_string())?);
260        }
261        Ok(())
262    }
263
264    #[test]
265    fn fetch_topic_subscribers() -> Result<()> {
266        // Setup dummy DB
267        let lmdb = open_tmp_lmdb();
268        let handler = SubscriptionHandler::new(&lmdb);
269
270        // Setup: subscribers are sw-agents and publisher is a smart-contract
271        let mut subscribers: Vec<AgentId> = std::iter::repeat_with(|| AgentId::generate())
272            .take(N)
273            .collect();
274        let publisher = Id::contract(ContractId::generate());
275        let topic = "tennis";
276
277        // Generate subscriptions
278        let mut txn = lmdb.begin_rw_txn()?;
279        for i in 0..N {
280            let topic = Topic::new(publisher, topic.to_string(), "method".to_string());
281            // Subscribe to topic
282            handler.subscribe(&mut txn, subscribers[i], topic)?;
283        }
284        txn.commit()?;
285
286        // Fetch topic subscribers
287        let mut output: Vec<AgentId> = handler
288            .get_topic_subscribers(publisher, topic.to_string())?
289            .iter()
290            .map(|(aid, _)| aid)
291            .cloned()
292            .collect();
293        // Check output
294        subscribers.sort();
295        output.sort();
296        assert_eq!(subscribers, output, "Mismatch in topic subscribers");
297        Ok(())
298    }
299
300    #[test]
301    fn fetch_subscribers() -> Result<()> {
302        // Setup dummy DB
303        let lmdb = open_tmp_lmdb();
304        let handler = SubscriptionHandler::new(&lmdb);
305
306        // Setup: subscribers are sw-agents and publisher is a smart-contract
307        let mut subscribers: Vec<AgentId> = std::iter::repeat_with(|| AgentId::generate())
308            .take(N)
309            .collect();
310        let publisher = Id::contract(ContractId::generate());
311        let topics = vec!["Soccer", "Tennis", "Golf", "Basketball", "Football"];
312
313        // Generate subscriptions
314        let mut txn = lmdb.begin_rw_txn()?;
315        for i in 0..N {
316            let topic = Topic::new(publisher, topics[i % 5].to_string(), "method".to_string());
317            // Subscribe to topic
318            handler.subscribe(&mut txn, subscribers[i], topic)?;
319        }
320        txn.commit()?;
321
322        // Fetch subscribers
323        let mut output: Vec<AgentId> = handler
324            .get_subscribers(publisher)?
325            .iter()
326            .map(|(aid, _)| aid)
327            .cloned()
328            .collect();
329        // Check output
330        subscribers.sort();
331        output.sort();
332        assert_eq!(subscribers, output, "Mismatch in subscribers");
333        Ok(())
334    }
335
336    #[test]
337    fn fetch_subscriptions() -> Result<()> {
338        // Setup dummy DB
339        let lmdb = open_tmp_lmdb();
340        let handler = SubscriptionHandler::new(&lmdb);
341
342        // Setup: subscriber is a sw-agent and publishers are smart-contracts
343        let subscriber = AgentId::generate();
344        let topics = vec!["Soccer", "Tennis", "Golf", "Basketball", "Football"];
345
346        let mut full_topic: Vec<String> = Vec::new();
347        // Generate subscriptions
348        let mut txn = lmdb.begin_rw_txn()?;
349        for i in 0..N {
350            let p = AgentId::generate();
351            let topic = topics[i % 5].to_string();
352            full_topic.push(format!("/{}/{}", p, topic.to_ascii_lowercase()));
353            // Subscribe to topic
354            let topic = Topic::new(Id::agent(p), topic, "method".to_string());
355            handler.subscribe(&mut txn, subscriber, topic)?;
356        }
357        txn.commit()?;
358
359        // Fetch subscriptions
360        let mut output = handler.get_subscriptions(subscriber)?;
361        output.sort();
362        full_topic.sort();
363        assert_eq!(full_topic, output, "Mismatch in subscriptions");
364        Ok(())
365    }
366}