use crate::{
swarm::BanyanStore,
trees::{
query::{LamportQuery, TagExprQuery, TimeQuery},
tags::{ScopedTag, ScopedTagSet, TagScope},
},
};
use ax_types::{app_id, tag, tags, Payload};
use futures::{StreamExt, TryFutureExt};
use libipld::cid::Cid;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::{collections::BTreeMap, sync::Arc};
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub enum PersistenceLevel {
Ephemeral,
Prefetch,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum NameRecordEvent {
Add {
name: ActyxName,
#[serde(with = "crate::util::serde_str")]
cid: Cid,
public: bool,
level: PersistenceLevel,
},
Remove {
name: ActyxName,
},
}
#[derive(Debug, Clone, Copy)]
pub struct NameRecord {
pub cid: Cid,
#[allow(dead_code)]
pub level: PersistenceLevel,
pub public: bool,
}
#[derive(
Clone,
Debug,
Ord,
PartialOrd,
PartialEq,
Eq,
Serialize,
Deserialize,
derive_more::Display,
derive_more::Deref,
derive_more::From,
)]
#[from(forward)]
pub struct ActyxName(String);
#[derive(Clone)]
pub struct ActyxNamingService {
state: Arc<Mutex<BTreeMap<ActyxName, NameRecord>>>,
store: BanyanStore,
}
fn mk_query() -> TagExprQuery {
let mut tags: ScopedTagSet = tags!("ans").into();
tags.insert(ScopedTag::new(TagScope::Internal, tag!("app_id:com.actyx")));
TagExprQuery::new(vec![tags], LamportQuery::all(), TimeQuery::all())
}
impl ActyxNamingService {
pub fn new(store: BanyanStore) -> Self {
let state = Arc::new(Mutex::new(BTreeMap::new()));
let state_c = state.clone();
let store_c = store.clone();
tokio::spawn(async move {
let mut stream = store_c.stream_filtered_stream_ordered(mk_query());
while let Some(event) = stream.next().await {
let event = match event {
Ok(event) => event,
Err(err) => {
tracing::warn!(%err, "Error streaming events");
continue;
}
};
match serde_cbor::from_slice(event.2.as_slice()) {
Ok(NameRecordEvent::Add {
name,
cid,
level,
public,
}) => {
tracing::debug!(%name, %cid, "Record Addition");
if let PersistenceLevel::Prefetch = level {
let name_c = name.clone();
tokio::spawn(
store_c
.ipfs()
.clone()
.sync(&cid, store_c.ipfs().peers())
.map_err(move |e| {
tracing::error!(%cid, name=%name_c, error=%e, "Error prefetching");
}),
);
if let Err(e) = store_c.ipfs().alias(&*name, Some(&cid)) {
tracing::error!(%name, error=%e, "Error aliasing");
}
}
state_c.lock().insert(name, NameRecord { cid, level, public });
}
Ok(NameRecordEvent::Remove { name }) => {
tracing::debug!(%name, "Record removal");
let _ = store_c.ipfs().alias(&*name, None);
state_c.lock().remove(&name);
}
Err(e) => {
tracing::error!(error=%e, "Error decoding ANS record");
}
};
}
});
Self { store, state }
}
pub async fn set(
&self,
name: ActyxName,
cid: Cid,
level: PersistenceLevel,
public: bool,
) -> anyhow::Result<Option<NameRecord>> {
let record = NameRecordEvent::Add {
name: name.clone(),
cid,
level,
public,
};
self.store
.append(
app_id!("com.actyx"),
vec![(
tags!("ans"),
Payload::compact(&record).expect("CBOR Serialization works"),
)],
)
.await?;
Ok(self.state.lock().insert(name, NameRecord { cid, level, public }))
}
pub fn get(&self, name: impl Into<ActyxName>) -> Option<NameRecord> {
self.state.lock().get(&name.into()).cloned()
}
pub async fn remove(&self, name: ActyxName) -> anyhow::Result<Option<NameRecord>> {
let record = NameRecordEvent::Remove { name: name.clone() };
self.store
.append(
app_id!("com.actyx"),
vec![(
tags!("ans"),
Payload::compact(&record).expect("CBOR Serialization works"),
)],
)
.await?;
Ok(self.state.lock().remove(&name))
}
}