use crate::{Result, SUBSCRIPTION_REL_SUB_DB};
use borderless::common::{Id, Introduction};
use borderless::events::Topic;
use borderless::{AgentId, Context, ContractId};
use borderless_kv_store::{Db, RawWrite, RoCursor, RoTx};
use std::str::FromStr;
fn generate_key(publisher: Id, topic: String, subscriber: Option<AgentId>) -> String {
let publisher = match publisher {
Id::Contract { contract_id } => contract_id.to_string().to_ascii_lowercase(),
Id::Agent { agent_id } => agent_id.to_string().to_ascii_lowercase(),
};
let subscriber = subscriber
.map(|agent| agent.to_string().to_ascii_lowercase())
.unwrap_or_default();
let topic = topic.trim_matches('/').to_ascii_lowercase();
if topic.is_empty() && subscriber.is_empty() {
format!("{publisher}\n")
} else {
format!("{publisher}\n{topic}\n{subscriber}")
}
}
fn extract_key(key: &[u8]) -> Result<(String, AgentId)> {
let key = std::str::from_utf8(key).with_context(|| "DB key deserialization failed")?;
let mut parts = key.splitn(3, '\n');
match (parts.next(), parts.next(), parts.next()) {
(Some(publisher), Some(topic), Some(s)) => {
let subscriber =
AgentId::from_str(s).with_context(|| "AgentId deserialization error")?;
let full_topic = format!("/{publisher}/{topic}");
Ok((full_topic, subscriber))
}
_ => Err(crate::Error::msg(
"SubscriptionHandler: malformed key error",
)),
}
}
pub struct SubscriptionHandler<'a, S: Db> {
db: &'a S,
}
impl<'a, S: Db> SubscriptionHandler<'a, S> {
pub fn new(db: &'a S) -> Self {
Self { db }
}
pub fn init(&self, txn: &mut <S as Db>::RwTx<'_>, introduction: Introduction) -> Result<()> {
match introduction.id {
Id::Contract { .. } => {} Id::Agent { agent_id } => {
for s in introduction.subscriptions {
self.subscribe(txn, agent_id, s)?
}
}
}
Ok(())
}
pub fn subscribe(
&self,
txn: &mut <S as Db>::RwTx<'_>,
subscriber: AgentId,
topic: Topic,
) -> Result<()> {
let db_ptr = self.db.open_sub_db(SUBSCRIPTION_REL_SUB_DB)?;
let key = generate_key(topic.publisher, topic.topic, Some(subscriber));
txn.write(&db_ptr, &key, &topic.method)?;
Ok(())
}
pub fn unsubscribe(
&self,
txn: &mut <S as Db>::RwTx<'_>,
subscriber: AgentId,
publisher: Id,
topic: String,
) -> Result<()> {
let db_ptr = self.db.open_sub_db(SUBSCRIPTION_REL_SUB_DB)?;
let key = generate_key(publisher, topic, Some(subscriber));
Ok(txn.delete(&db_ptr, &key)?)
}
pub fn get_topic_subscribers(
&self,
publisher: Id,
topic: String,
) -> Result<Vec<(AgentId, String)>> {
let db_ptr = self.db.open_sub_db(SUBSCRIPTION_REL_SUB_DB)?;
let txn = self.db.begin_ro_txn()?;
let mut cursor = txn.ro_cursor(&db_ptr)?;
let mut subscribers = Vec::new();
let prefix = generate_key(publisher, topic, None);
for (key, value) in cursor.iter_from(&prefix) {
if !key.starts_with(prefix.as_bytes()) {
break;
}
let topic =
String::from_utf8(value.to_vec()).with_context(|| "Failed to deserialize topic")?;
let (_, subscriber) = extract_key(key)?;
subscribers.push((subscriber, topic));
}
drop(cursor);
Ok(subscribers)
}
pub fn get_subscribers(&self, publisher: Id) -> Result<Vec<(AgentId, String)>> {
self.get_topic_subscribers(publisher, String::default())
}
pub fn get_subscriptions(&self, target: AgentId) -> Result<Vec<String>> {
let db_ptr = self.db.open_sub_db(SUBSCRIPTION_REL_SUB_DB)?;
let txn = self.db.begin_ro_txn()?;
let mut cursor = txn.ro_cursor(&db_ptr)?;
let mut topics = Vec::new();
for (key, _) in cursor.iter() {
let (full_topic, subscriber) = extract_key(key)?;
if target != subscriber {
continue;
}
topics.push(full_topic);
}
drop(cursor);
Ok(topics)
}
pub fn unsubscribe_all(&self, txn: &mut <S as Db>::RwTx<'_>, subscriber: Id) -> Result<()> {
let subscriber = match subscriber {
Id::Contract { .. } => return Ok(()), Id::Agent { agent_id } => agent_id,
};
let subscriptions = self.get_subscriptions(subscriber)?;
for s in subscriptions {
let mut parts = s.trim_matches('/').splitn(2, '/');
let p = parts.next().expect("Malformed key");
let topic = parts.next().expect("Malformed key").to_string();
let publisher = if let Ok(cid) = ContractId::from_str(p) {
Id::from(cid)
} else if let Ok(aid) = AgentId::from_str(p) {
Id::from(aid)
} else {
return Err(crate::error::Error::msg("Invalid publisher"));
};
self.unsubscribe(txn, subscriber, publisher, topic)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::db::subscriptions::SubscriptionHandler;
use crate::SUBSCRIPTION_REL_SUB_DB;
use borderless::common::Id;
use borderless::events::Topic;
use borderless::{AgentId, ContractId, Result};
use borderless_kv_store::backend::lmdb::Lmdb;
use borderless_kv_store::{Db, Tx};
use tempfile::tempdir;
const N: usize = 10;
fn open_tmp_lmdb() -> Lmdb {
let tmp_dir = tempdir().unwrap();
let env = Lmdb::new(tmp_dir.path(), 1).unwrap();
env.create_sub_db(SUBSCRIPTION_REL_SUB_DB).unwrap();
env
}
#[test]
fn subscription() -> Result<()> {
let lmdb = open_tmp_lmdb();
let handler = SubscriptionHandler::new(&lmdb);
let mut txn = lmdb.begin_rw_txn()?;
let subscribers: Vec<AgentId> = std::iter::repeat_with(|| AgentId::generate())
.take(N)
.collect();
let publishers: Vec<Id> = std::iter::repeat_with(|| Id::agent(AgentId::generate()))
.take(N)
.collect();
let topic = "MyTopic";
for i in 0..N {
let s = subscribers[i];
let p = publishers[i];
let topic = Topic::new(p, topic.to_string(), "method".to_string());
handler.subscribe(&mut txn, s, topic.clone())?;
}
txn.commit()?;
for i in 0..N {
let s = subscribers[i];
let p = publishers[i].to_string();
let subscriptions = handler.get_subscriptions(s)?;
assert_eq!(subscriptions.len(), 1);
let full_topic = format!("/{}/{}", p, topic.to_ascii_lowercase());
assert_eq!(subscriptions[0], full_topic);
}
Ok(())
}
#[test]
fn unsubscription() -> Result<()> {
let lmdb = open_tmp_lmdb();
let handler = SubscriptionHandler::new(&lmdb);
let subscribers: Vec<AgentId> = std::iter::repeat_with(|| AgentId::generate())
.take(N)
.collect();
let publishers: Vec<Id> = std::iter::repeat_with(|| Id::agent(AgentId::generate()))
.take(N)
.collect();
let topic = "MyTopic";
let mut txn = lmdb.begin_rw_txn()?;
for i in 0..N {
let topic = Topic::new(publishers[i], topic.to_string(), "method".to_string());
handler.subscribe(&mut txn, subscribers[i], topic)?;
}
txn.commit()?;
let mut txn = lmdb.begin_rw_txn()?;
for i in 0..N {
let s = subscribers[i];
let p = publishers[i];
assert!(handler.unsubscribe(&mut txn, s, p, topic.to_string())?);
}
Ok(())
}
#[test]
fn fetch_topic_subscribers() -> Result<()> {
let lmdb = open_tmp_lmdb();
let handler = SubscriptionHandler::new(&lmdb);
let mut subscribers: Vec<AgentId> = std::iter::repeat_with(|| AgentId::generate())
.take(N)
.collect();
let publisher = Id::contract(ContractId::generate());
let topic = "tennis";
let mut txn = lmdb.begin_rw_txn()?;
for i in 0..N {
let topic = Topic::new(publisher, topic.to_string(), "method".to_string());
handler.subscribe(&mut txn, subscribers[i], topic)?;
}
txn.commit()?;
let mut output: Vec<AgentId> = handler
.get_topic_subscribers(publisher, topic.to_string())?
.iter()
.map(|(aid, _)| aid)
.cloned()
.collect();
subscribers.sort();
output.sort();
assert_eq!(subscribers, output, "Mismatch in topic subscribers");
Ok(())
}
#[test]
fn fetch_subscribers() -> Result<()> {
let lmdb = open_tmp_lmdb();
let handler = SubscriptionHandler::new(&lmdb);
let mut subscribers: Vec<AgentId> = std::iter::repeat_with(|| AgentId::generate())
.take(N)
.collect();
let publisher = Id::contract(ContractId::generate());
let topics = vec!["Soccer", "Tennis", "Golf", "Basketball", "Football"];
let mut txn = lmdb.begin_rw_txn()?;
for i in 0..N {
let topic = Topic::new(publisher, topics[i % 5].to_string(), "method".to_string());
handler.subscribe(&mut txn, subscribers[i], topic)?;
}
txn.commit()?;
let mut output: Vec<AgentId> = handler
.get_subscribers(publisher)?
.iter()
.map(|(aid, _)| aid)
.cloned()
.collect();
subscribers.sort();
output.sort();
assert_eq!(subscribers, output, "Mismatch in subscribers");
Ok(())
}
#[test]
fn fetch_subscriptions() -> Result<()> {
let lmdb = open_tmp_lmdb();
let handler = SubscriptionHandler::new(&lmdb);
let subscriber = AgentId::generate();
let topics = vec!["Soccer", "Tennis", "Golf", "Basketball", "Football"];
let mut full_topic: Vec<String> = Vec::new();
let mut txn = lmdb.begin_rw_txn()?;
for i in 0..N {
let p = AgentId::generate();
let topic = topics[i % 5].to_string();
full_topic.push(format!("/{}/{}", p, topic.to_ascii_lowercase()));
let topic = Topic::new(Id::agent(p), topic, "method".to_string());
handler.subscribe(&mut txn, subscriber, topic)?;
}
txn.commit()?;
let mut output = handler.get_subscriptions(subscriber)?;
output.sort();
full_topic.sort();
assert_eq!(full_topic, output, "Mismatch in subscriptions");
Ok(())
}
}