Skip to main content

borderless_runtime/db/
subscriptions.rs

1use crate::{Result, SUBSCRIPTION_REL_SUB_DB};
2use borderless::common::{Id, Introduction};
3use borderless::events::Topic;
4use borderless::{AgentId, Context};
5use borderless_kv_store::{Db, RawWrite, RoCursor, RoTx, Tx};
6use std::str::FromStr;
7
8/// Generates a DB key from a publisher, subscriber and topic
9///
10/// Current DB relationship is: publisher | topic | subscriber => method_name
11///
12/// For generating a subscribers look-up key, leave the subscriber field as None
13fn generate_key(publisher: Id, topic: String, subscriber: Option<AgentId>) -> String {
14    // Publishers can be either Contracts or Agents
15    let publisher = match publisher {
16        Id::Contract { contract_id } => contract_id.to_string().to_ascii_lowercase(),
17        Id::Agent { agent_id } => agent_id.to_string().to_ascii_lowercase(),
18    };
19    // Subscribers are only Agents
20    let subscriber = subscriber
21        .map(|agent| agent.to_string().to_ascii_lowercase())
22        .unwrap_or_default();
23    // Remove leading and trailing slashes
24    let topic = topic.trim_matches('/').to_ascii_lowercase();
25
26    //NOTE: in look-up keys the subscriber must be empty
27    // The unused delimiters are removed to avoid interferences with the DB cursor
28    match (topic.is_empty(), subscriber.is_empty()) {
29        (true, true) => format!("{publisher}\n"),
30        (false, true) => format!("{publisher}\n{topic}\n"),
31        _ => format!("{publisher}\n{topic}\n{subscriber}"),
32    }
33}
34
35/// Extracts the topic and subscriber from a DB entry
36///
37/// Returns a tuple, or an error if the deserialization fails
38fn extract_entry(key: &[u8], value: &[u8]) -> Result<(Topic, AgentId)> {
39    let key = std::str::from_utf8(key).with_context(|| "DB key deserialization failed")?;
40    let method = std::str::from_utf8(value).with_context(|| "DB value deserialization failed")?;
41
42    let mut parts = key.splitn(3, '\n');
43    match (parts.next(), parts.next(), parts.next()) {
44        (Some(p), Some(topic), Some(s)) => {
45            // Process subscriber
46            let subscriber = AgentId::from_str(s).with_context(|| "Invalid subscriber")?;
47            // Process publisher
48            let publisher = p.parse().with_context(|| "Invalid publisher")?;
49            Ok((Topic::new(publisher, topic, method), subscriber))
50        }
51        _ => Err(crate::Error::msg("Malformed key error")),
52    }
53}
54
55pub struct SubscriptionHandler<'a, S: Db> {
56    db: &'a S,
57}
58
59impl<'a, S: Db> SubscriptionHandler<'a, S> {
60    pub fn new(db: &'a S) -> Self {
61        Self { db }
62    }
63
64    /// Loads the subscriptions from a software agent introduction
65    pub fn init(&self, txn: &mut <S as Db>::RwTx<'_>, introduction: Introduction) -> Result<()> {
66        // Write static subscriptions
67        match introduction.id {
68            Id::Contract { .. } => {} // Not applicable
69            Id::Agent { agent_id } => {
70                for s in introduction.subscriptions {
71                    self.subscribe_txn(txn, agent_id, s)?
72                }
73            }
74        }
75        Ok(())
76    }
77
78    /// Subscribes an ['AgentId'] to a topic from a specific publisher
79    ///
80    /// The changes are automatically commited to DB
81    pub fn subscribe(&self, subscriber: AgentId, topic: Topic) -> Result<()> {
82        let mut txn = self.db.begin_rw_txn()?;
83        self.subscribe_txn(&mut txn, subscriber, topic)?;
84        Ok(txn.commit()?)
85    }
86
87    /// Subscribes an ['AgentId'] to a topic from a specific publisher
88    ///
89    /// The user is responsible for commiting the changes to DB
90    fn subscribe_txn(
91        &self,
92        txn: &mut <S as Db>::RwTx<'_>,
93        subscriber: AgentId,
94        topic: Topic,
95    ) -> Result<()> {
96        // Setup DB access
97        let db_ptr = self.db.open_sub_db(SUBSCRIPTION_REL_SUB_DB)?;
98        // Generate DB key
99        let key = generate_key(topic.publisher, topic.topic, Some(subscriber));
100        txn.write(&db_ptr, &key, &topic.method)?;
101        Ok(())
102    }
103
104    /// Unsubscribes an ['AgentId'] from a topic
105    ///
106    /// The changes are automatically commited to DB
107    pub fn unsubscribe(&self, subscriber: AgentId, topic: Topic) -> Result<()> {
108        let mut txn = self.db.begin_rw_txn()?;
109        self.unsubscribe_txn(&mut txn, subscriber, topic)?;
110        Ok(txn.commit()?)
111    }
112
113    /// Unsubscribes an ['AgentId'] from a topic
114    ///
115    /// The user is responsible for commiting the changes to DB
116    fn unsubscribe_txn(
117        &self,
118        txn: &mut <S as Db>::RwTx<'_>,
119        subscriber: AgentId,
120        topic: Topic,
121    ) -> Result<()> {
122        // Setup DB access
123        let db_ptr = self.db.open_sub_db(SUBSCRIPTION_REL_SUB_DB)?;
124        // Generate DB key
125        let key = generate_key(topic.publisher, topic.topic, Some(subscriber));
126        Ok(txn.delete(&db_ptr, &key)?)
127    }
128
129    /// Fetches the active subscribers for a full topic (publisher + topic)
130    pub fn get_topic_subscribers(
131        &self,
132        publisher: Id,
133        topic: String,
134    ) -> Result<Vec<(AgentId, String)>> {
135        // Setup DB cursor
136        let db_ptr = self.db.open_sub_db(SUBSCRIPTION_REL_SUB_DB)?;
137        let txn = self.db.begin_ro_txn()?;
138        let mut cursor = txn.ro_cursor(&db_ptr)?;
139
140        let mut subscribers = Vec::new();
141
142        // Use an efficient look-up key
143        let prefix = generate_key(publisher, topic, None);
144
145        for (key, value) in cursor.iter_from(&prefix) {
146            // Stop iterating when prefix no longer matches
147            if !key.starts_with(prefix.as_bytes()) {
148                break;
149            }
150            let (topic, subscriber) = extract_entry(key, value)?;
151            // Push the tuple
152            subscribers.push((subscriber, topic.method));
153        }
154        // Free up resources
155        drop(cursor);
156        Ok(subscribers)
157    }
158
159    /// Fetches all active subscriptions for the specified ['AgentId']
160    pub fn get_subscriptions(&self, target: AgentId) -> Result<Vec<Topic>> {
161        // Setup DB cursor
162        let db_ptr = self.db.open_sub_db(SUBSCRIPTION_REL_SUB_DB)?;
163        let txn = self.db.begin_ro_txn()?;
164        let mut cursor = txn.ro_cursor(&db_ptr)?;
165
166        let mut topics = Vec::new();
167        for (key, value) in cursor.iter() {
168            let (topic, subscriber) = extract_entry(key, value)?;
169            // Ignore subscription not related with target
170            if target != subscriber {
171                continue;
172            }
173            // Push the topic
174            topics.push(topic);
175        }
176        // Free up resources
177        drop(cursor);
178        Ok(topics)
179    }
180
181    pub fn unsubscribe_all(&self, txn: &mut <S as Db>::RwTx<'_>, subscriber: Id) -> Result<()> {
182        let subscriber = match subscriber {
183            Id::Contract { .. } => return Ok(()), // Not applicable
184            Id::Agent { agent_id } => agent_id,
185        };
186        // Fetch active subscriptions
187        let subscriptions = self.get_subscriptions(subscriber)?;
188        // Unsubscribe from each topic
189        for topic in subscriptions {
190            self.unsubscribe_txn(txn, subscriber, topic)?;
191        }
192        Ok(())
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use crate::db::subscriptions::SubscriptionHandler;
199    use crate::SUBSCRIPTION_REL_SUB_DB;
200    use borderless::common::Id;
201    use borderless::events::Topic;
202    use borderless::{AgentId, ContractId, Result};
203    use borderless_kv_store::backend::lmdb::Lmdb;
204    use borderless_kv_store::Db;
205    use tempfile::tempdir;
206
207    const N: usize = 10;
208
209    fn open_tmp_lmdb() -> Lmdb {
210        let tmp_dir = tempdir().unwrap();
211        let env = Lmdb::new(tmp_dir.path(), 1).unwrap();
212        env.create_sub_db(SUBSCRIPTION_REL_SUB_DB).unwrap();
213        env
214    }
215
216    #[test]
217    fn subscription() -> Result<()> {
218        // Setup dummy DB
219        let lmdb = open_tmp_lmdb();
220        let handler = SubscriptionHandler::new(&lmdb);
221
222        // Setup: subscribers are sw-agents and publishers are smart-contracts
223        let subscribers: Vec<AgentId> = std::iter::repeat_with(|| AgentId::generate())
224            .take(N)
225            .collect();
226        let publishers: Vec<Id> = std::iter::repeat_with(|| Id::contract(ContractId::generate()))
227            .take(N)
228            .collect();
229        let topic = "MyTopic";
230
231        // Generate subscriptions
232        for i in 0..N {
233            let topic = Topic::new(publishers[i], topic.to_string(), "method".to_string());
234            // Subscribe to topic
235            handler.subscribe(subscribers[i], topic)?;
236        }
237
238        // Check subscriptions are present
239        for i in 0..N {
240            let subscriptions = handler.get_subscriptions(subscribers[i])?;
241            assert_eq!(subscriptions.len(), 1);
242            assert_eq!(subscriptions[0].publisher, publishers[i]);
243            assert_eq!(
244                subscriptions[0].topic,
245                topic.to_string().to_ascii_lowercase()
246            );
247        }
248        Ok(())
249    }
250
251    #[test]
252    fn unsubscription() -> Result<()> {
253        // Setup dummy DB
254        let lmdb = open_tmp_lmdb();
255        let handler = SubscriptionHandler::new(&lmdb);
256
257        // Setup: both subscribers and publishers are sw-agents
258        let subscribers: Vec<AgentId> = std::iter::repeat_with(|| AgentId::generate())
259            .take(N)
260            .collect();
261        let publishers: Vec<Id> = std::iter::repeat_with(|| Id::agent(AgentId::generate()))
262            .take(N)
263            .collect();
264        let topic = "MyTopic";
265
266        // Generate subscriptions
267        for i in 0..N {
268            let topic = Topic::new(publishers[i], topic.to_string(), "method".to_string());
269            // Subscribe to topic
270            handler.subscribe(subscribers[i], topic)?;
271        }
272
273        // Check that unsubscriptions are successful
274        for i in 0..N {
275            let s = subscribers[i];
276            let p = publishers[i];
277            // Unsubscribe from topic
278            handler.unsubscribe(s, Topic::new(p, topic.to_string(), String::default()))?;
279        }
280
281        // All subscriptions must be gone
282        for p in publishers {
283            assert!(handler
284                .get_topic_subscribers(p, topic.to_string())?
285                .is_empty());
286        }
287        Ok(())
288    }
289
290    #[test]
291    fn fetch_topic_subscribers() -> Result<()> {
292        // Setup dummy DB
293        let lmdb = open_tmp_lmdb();
294        let handler = SubscriptionHandler::new(&lmdb);
295
296        // Setup: subscribers are sw-agents and publisher is a smart-contract
297        let mut subscribers: Vec<AgentId> = std::iter::repeat_with(|| AgentId::generate())
298            .take(N)
299            .collect();
300        let publisher = Id::contract(ContractId::generate());
301        let topic = "tennis";
302
303        // Generate subscriptions
304        for i in 0..N {
305            let topic = Topic::new(publisher, topic.to_string(), "method".to_string());
306            // Subscribe to topic
307            handler.subscribe(subscribers[i], topic)?;
308        }
309
310        // Fetch topic subscribers
311        let mut output: Vec<AgentId> = handler
312            .get_topic_subscribers(publisher, topic.to_string())?
313            .iter()
314            .map(|(aid, _)| aid)
315            .cloned()
316            .collect();
317        // Check output
318        subscribers.sort();
319        output.sort();
320        assert_eq!(subscribers, output, "Mismatch in topic subscribers");
321        Ok(())
322    }
323
324    #[test]
325    fn fetch_subscribers() -> Result<()> {
326        // Setup dummy DB
327        let lmdb = open_tmp_lmdb();
328        let handler = SubscriptionHandler::new(&lmdb);
329
330        // Setup: subscribers are sw-agents and publisher is a smart-contract
331        let mut subscribers: Vec<AgentId> = std::iter::repeat_with(|| AgentId::generate())
332            .take(N)
333            .collect();
334        let publisher = Id::contract(ContractId::generate());
335        let topics = vec!["Soccer", "Tennis", "Golf", "Basketball", "Football"];
336
337        // Generate subscriptions
338        for i in 0..N {
339            let topic = Topic::new(publisher, topics[i % 5].to_string(), "method".to_string());
340            // Subscribe to topic
341            handler.subscribe(subscribers[i], topic)?;
342        }
343
344        // Fetch subscribers
345        let mut output: Vec<AgentId> = handler
346            .get_topic_subscribers(publisher, String::default())?
347            .iter()
348            .map(|(aid, _)| aid)
349            .cloned()
350            .collect();
351        // Check output
352        subscribers.sort();
353        output.sort();
354        assert_eq!(subscribers, output, "Mismatch in subscribers");
355        Ok(())
356    }
357
358    #[test]
359    fn fetch_subscriptions() -> Result<()> {
360        // Setup dummy DB
361        let lmdb = open_tmp_lmdb();
362        let handler = SubscriptionHandler::new(&lmdb);
363
364        // Setup: subscriber is a sw-agent and publishers are smart-contracts
365        let subscriber = AgentId::generate();
366        let topics = vec!["Soccer", "Tennis", "Golf", "Basketball", "Football"];
367
368        let mut susbcriptions: Vec<Topic> = Vec::new();
369        // Generate subscriptions
370        for i in 0..N {
371            let p = ContractId::generate();
372            let t = topics[i % 5].to_string().to_ascii_lowercase();
373            // Subscribe to topic
374            let topic = Topic::new(Id::contract(p), t, "method".to_string());
375            handler.subscribe(subscriber, topic.clone())?;
376            // Push new topic
377            susbcriptions.push(topic);
378        }
379
380        // Fetch subscriptions
381        let output = handler.get_subscriptions(subscriber)?;
382        for t in output {
383            assert!(susbcriptions.contains(&t), "Mismatch in subscriptions",);
384        }
385        Ok(())
386    }
387}