use std::collections::HashMap;
use tracing::debug;
use veilid_core::{
CryptoTyped, DHTRecordDescriptor, DHTSchemaDFLT, KeyPair, RouteId, RoutingContext, TableDB,
TableStore, TypedKey, TypedKeyPair, ValueSubkey, VeilidAPI, CRYPTO_KIND_VLD0,
};
use veilid_core::{FromStr, Target};
use crate::error::warn_err;
use crate::proto::codec::{
ChangesResponse, Decodable, Encodable, Envelope, NodeStatus, Request, Response,
};
use crate::proto::crypto::Crypto;
use crate::{error::Result, other_err};
const DHT_N_SUBKEYS: u16 = 3;
const DHT_SUBKEY_STATUS: ValueSubkey = 0;
const TABLE_STORE_LOCAL_N_COLUMNS: u32 = 2;
const TABLE_STORE_LOCAL_COLUMN_DHT_KEY: u32 = 0;
const TABLE_STORE_LOCAL_COLUMN_DHT_OWNER_KEYPAIR: u32 = 1;
const TABLE_STORE_REMOTES_N_COLUMNS: u32 = 1;
const TABLE_STORE_REMOTES_COLUMN_DHT_KEY: u32 = 0;
#[derive(Clone)]
pub struct Sovereign {
dht_key: TypedKey,
dht_owner_keypair: CryptoTyped<KeyPair>,
dht: Option<DHTRecordDescriptor>,
route: Option<Route>,
}
#[derive(Clone)]
struct Route {
id: RouteId,
data: Vec<u8>,
}
#[derive(Clone)]
pub struct Status {
pub site_id: Vec<u8>,
pub db_version: i64,
}
impl Sovereign {
async fn open_db(ts: &TableStore) -> Result<TableDB> {
let db = ts
.open("ddcp_conclave_local", TABLE_STORE_LOCAL_N_COLUMNS)
.await?;
Ok(db)
}
pub fn dht_key(&self) -> TypedKey {
self.dht_key
}
pub fn auth_key(&self) -> TypedKey {
CryptoTyped {
kind: self.dht_owner_keypair.kind,
value: self.dht_owner_keypair.value.key,
}
}
pub async fn init(routing_context: &RoutingContext) -> Result<Sovereign> {
let ts = routing_context.api().table_store()?;
let db = Self::open_db(&ts).await?;
let new_dht = routing_context
.create_dht_record(
veilid_core::DHTSchema::DFLT(DHTSchemaDFLT {
o_cnt: DHT_N_SUBKEYS,
}),
None,
)
.await?;
let dht_owner_keypair = KeyPair::new(
new_dht.owner().to_owned(),
new_dht
.owner_secret()
.ok_or(other_err("expected dht owner secret"))?
.to_owned(),
);
routing_context
.close_dht_record(new_dht.key().to_owned())
.await?;
db.store_json(TABLE_STORE_LOCAL_COLUMN_DHT_KEY, &[], new_dht.key())
.await?;
db.store_json(
TABLE_STORE_LOCAL_COLUMN_DHT_OWNER_KEYPAIR,
&[],
&dht_owner_keypair,
)
.await?;
Self::load(routing_context)
.await?
.ok_or(other_err("failed to initialize sovereign identity"))
}
pub async fn load(routing_context: &RoutingContext) -> Result<Option<Sovereign>> {
let ts = routing_context.api().table_store()?;
let db = Self::open_db(&ts).await?;
let Some(dht_key) = db
.load_json::<TypedKey>(TABLE_STORE_LOCAL_COLUMN_DHT_KEY, &[])
.await?
else {
return Ok(None);
};
let Some(dht_owner_keypair) = db
.load_json::<TypedKeyPair>(TABLE_STORE_LOCAL_COLUMN_DHT_OWNER_KEYPAIR, &[])
.await?
else {
return Ok(None);
};
let dht = routing_context
.open_dht_record(dht_key.clone(), Some(dht_owner_keypair.value.clone()))
.await?;
Ok(Some(Sovereign {
dht_key,
dht_owner_keypair,
dht: Some(dht),
route: None,
}))
}
pub fn release_route(&mut self, routing_context: &RoutingContext) -> Result<()> {
if let Some(ref route) = self.route {
routing_context.api().release_private_route(route.id)?;
self.route = None
}
Ok(())
}
pub async fn announce(
&mut self,
routing_context: &RoutingContext,
status: Status,
) -> Result<()> {
let route = match self.route {
Some(ref route) => route,
None => {
let (id, data) = routing_context.api().new_private_route().await?;
self.route = Some(Route { id, data });
self.route.as_ref().unwrap()
}
};
routing_context
.set_dht_value(
self.dht_key,
DHT_SUBKEY_STATUS,
NodeStatus {
site_id: status.site_id.clone(),
db_version: status.db_version,
key: self.auth_key().to_string(),
route: route.data.clone(),
}
.encode()?,
)
.await?;
Ok(())
}
async fn close(&mut self, routing_context: &RoutingContext) -> Result<()> {
if let Some(dht) = self.dht.take() {
routing_context
.close_dht_record(dht.key().to_owned())
.await?;
}
if let Some(route) = self.route.take() {
routing_context.api().release_private_route(route.id)?;
}
Ok(())
}
}
#[derive(Clone)]
pub struct Peer {
name: String,
dht_key: TypedKey,
dht: Option<DHTRecordDescriptor>,
node_status: Option<NodeStatus>,
route_id: Option<RouteId>,
}
impl Peer {
async fn open_db(ts: &TableStore) -> Result<TableDB> {
let db = ts
.open("ddcp_conclave_remotes", TABLE_STORE_REMOTES_N_COLUMNS)
.await?;
Ok(db)
}
pub async fn new(api: &VeilidAPI, name: &str, dht_public_key: &str) -> Result<Peer> {
let ts = api.table_store()?;
let db = Self::open_db(&ts).await?;
let db_key = name.as_bytes().to_vec();
let dht_key = TypedKey::from_str(dht_public_key)?;
db.store_json(
TABLE_STORE_REMOTES_COLUMN_DHT_KEY,
db_key.as_slice(),
&dht_key,
)
.await?;
Self::load(name, &db, &db_key).await
}
pub async fn load_all(api: &VeilidAPI) -> Result<HashMap<String, Peer>> {
let ts = api.table_store()?;
let db = Self::open_db(&ts).await?;
let mut remotes = HashMap::new();
for remote_key in db
.get_keys(TABLE_STORE_REMOTES_COLUMN_DHT_KEY)
.await?
.iter()
{
let remote_name = String::from_utf8(remote_key.to_owned()).map_err(other_err)?;
let peer = Peer::load(&remote_name, &db, remote_key).await?;
remotes.insert(remote_name, peer);
}
Ok(remotes)
}
async fn load(name: &str, db: &TableDB, db_key: &Vec<u8>) -> Result<Peer> {
let dht_public_key = db
.load_json::<TypedKey>(TABLE_STORE_REMOTES_COLUMN_DHT_KEY, db_key)
.await?
.ok_or(other_err("remote peer missing dht key"))?;
Ok(Peer {
name: name.to_owned(),
dht_key: dht_public_key,
dht: None,
node_status: None,
route_id: None,
})
}
pub async fn remove(self, ts: &TableStore) -> Result<()> {
let db = Self::open_db(ts).await?;
let db_key = self.name.as_bytes();
db.delete(TABLE_STORE_REMOTES_COLUMN_DHT_KEY, db_key)
.await?;
Ok(())
}
pub fn dht_key(&self) -> TypedKey {
self.dht_key
}
pub fn auth_key(&self) -> Result<Option<TypedKey>> {
match self.node_status {
Some(ref ns) => Ok(Some(CryptoTyped::from_str(&ns.key)?)),
None => Ok(None),
}
}
pub fn name(&self) -> String {
self.name.to_owned()
}
pub async fn refresh(
&mut self,
routing_context: &RoutingContext,
) -> Result<Option<NodeStatus>> {
self.refresh_dht(routing_context).await?;
self.refresh_route(routing_context).await?;
Ok(self.node_status())
}
async fn refresh_dht(&mut self, routing_context: &RoutingContext) -> Result<()> {
if let Some(dht) = self.dht.as_ref() {
routing_context
.close_dht_record(dht.key().to_owned())
.await?;
};
self.dht = Some(routing_context.open_dht_record(self.dht_key, None).await?);
self.node_status = match routing_context
.get_dht_value(self.dht_key, DHT_SUBKEY_STATUS, true)
.await?
{
Some(data) => Some(NodeStatus::decode(data.data())?),
None => None,
};
Ok(())
}
async fn refresh_route(&mut self, routing_context: &RoutingContext) -> Result<()> {
if let Some(status) = &self.node_status {
self.route_id = Some(
routing_context
.api()
.import_remote_private_route(status.route.to_vec())?,
);
}
Ok(())
}
pub fn node_status(&self) -> Option<NodeStatus> {
self.node_status.clone()
}
async fn close(&mut self, routing_context: &RoutingContext) -> Result<()> {
if let Some(dht) = self.dht.take() {
routing_context
.close_dht_record(dht.key().to_owned())
.await?;
}
if let Some(route_id) = self.route_id.take() {
routing_context.api().release_private_route(route_id)?;
}
Ok(())
}
}
#[derive(Clone)]
pub struct Conclave {
routing_context: RoutingContext,
sovereign: Sovereign,
remotes: HashMap<String, Peer>,
}
impl Conclave {
pub async fn new(routing_context: RoutingContext) -> Result<Conclave> {
let sovereign = match Sovereign::load(&routing_context).await? {
Some(sov) => sov,
None => Sovereign::init(&routing_context).await?,
};
let remotes = Peer::load_all(&routing_context.api()).await?;
Ok(Conclave {
routing_context,
sovereign,
remotes,
})
}
pub async fn refresh(&mut self, status: Status) -> Result<()> {
self.sovereign
.announce(&self.routing_context, status)
.await?;
self.refresh_peers().await?;
Ok(())
}
pub async fn refresh_peers(&mut self) -> Result<()> {
for peer in self.remotes.values_mut().into_iter() {
peer.refresh(&self.routing_context).await?;
}
Ok(())
}
pub fn sovereign(&self) -> &Sovereign {
return &self.sovereign;
}
pub fn sovereign_mut(&mut self) -> &mut Sovereign {
return &mut self.sovereign;
}
pub fn peer(&self, name: &str) -> Option<&Peer> {
return self.remotes.get(name);
}
pub fn peer_mut(&mut self, name: &str) -> Option<&mut Peer> {
return self.remotes.get_mut(name);
}
pub async fn set_peer(&mut self, peer: Peer) -> Result<()> {
self.remotes.insert(peer.name.clone(), peer);
Ok(())
}
pub async fn remove_peer(&mut self, name: &str) -> Result<bool> {
match self.remotes.remove(name) {
Some(peer) => {
peer.remove(&self.routing_context.api().table_store()?)
.await?;
Ok(true)
}
None => Ok(false),
}
}
pub fn peers<'a>(&'a self) -> std::collections::hash_map::Values<'a, String, Peer> {
self.remotes.values().into_iter()
}
pub async fn changes(&self, peer: &Peer, since_db_version: i64) -> Result<ChangesResponse> {
let crypto = self.crypto(peer)?;
let req_bytes = Envelope {
sender: self.sovereign.dht_key.to_string(),
contents: crypto.encode(Request::Changes { since_db_version })?,
}
.encode()?;
debug!(len = req_bytes.len(), "app_call request");
let peer_route_id = peer.route_id.ok_or(other_err("no route to peer"))?;
let resp_bytes = match self
.routing_context
.app_call(Target::PrivateRoute(peer_route_id), req_bytes)
.await
{
Ok(resp_bytes) => resp_bytes,
Err(e) => {
let _ = warn_err(
self.routing_context
.api()
.release_private_route(peer_route_id),
"failed to release peer route",
);
return Err(crate::Error::VeilidAPI(e));
}
};
debug!(len = resp_bytes.len(), "app_call response");
let resp = Envelope::decode(resp_bytes.as_slice())?;
match crypto.decode::<Response>(&resp.contents)? {
Response::Changes(changes) => Ok(changes),
r => Err(other_err(format!("invalid response: {:?}", r))),
}
}
pub fn crypto(&self, peer: &Peer) -> Result<Crypto> {
Ok(Crypto::new(
self.routing_context
.api()
.crypto()?
.get(CRYPTO_KIND_VLD0)
.ok_or(other_err("VLD0 not available"))?,
self.sovereign.dht_owner_keypair,
peer.auth_key()?.ok_or(other_err("peer key not found"))?,
))
}
pub async fn close(mut self) -> Result<()> {
self.sovereign.close(&self.routing_context).await?;
for peer in self.remotes.values_mut().into_iter() {
peer.close(&self.routing_context).await?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use veilid_core::Sequencing;
use crate::tests::api::{setup_api, teardown_api, TEST_API_MUTEX};
use super::*;
#[tokio::test]
async fn basic() {
let _lock = TEST_API_MUTEX.lock().expect("lock");
let api = setup_api().await;
let routing_context = api
.routing_context()
.expect("routing context")
.with_sequencing(Sequencing::PreferOrdered)
.with_default_safety()
.expect("ok");
let mut ccl = Conclave::new(routing_context).await.expect("ok");
assert_eq!(ccl.peers().len(), 0);
let vld0 = ccl
.routing_context
.api()
.crypto()
.expect("crypto")
.get(CRYPTO_KIND_VLD0)
.expect("VLD0");
let peer_keypair = vld0.generate_keypair();
let peer = Peer::new(&api, "bob", &peer_keypair.key.to_string())
.await
.expect("new peer");
ccl.set_peer(peer).await.expect("set peer");
assert_eq!(ccl.peers().len(), 1);
ccl.close().await.expect("ok");
teardown_api(api).await;
}
#[tokio::test]
async fn pair() {
let _lock = TEST_API_MUTEX.lock().expect("lock");
let api = setup_api().await;
let routing_context = api
.routing_context()
.expect("routing context")
.with_sequencing(Sequencing::PreferOrdered)
.with_default_safety()
.expect("ok");
let mut ccl = Conclave::new(routing_context).await.expect("ok");
let vld0 = ccl
.routing_context
.api()
.crypto()
.expect("crypto")
.get(CRYPTO_KIND_VLD0)
.expect("VLD0");
let peer_keypair = CryptoTyped {
kind: CRYPTO_KIND_VLD0,
value: vld0.generate_keypair(),
};
let peer_auth_key = CryptoTyped {
kind: CRYPTO_KIND_VLD0,
value: peer_keypair.value.key,
};
let mut peer = Peer::new(&api, "bob", &peer_auth_key.to_string())
.await
.expect("new peer");
peer.node_status = Some(NodeStatus {
site_id: vec![],
db_version: 42,
key: peer_auth_key.to_string(),
route: vec![],
});
ccl.set_peer(peer.clone()).await.expect("set peer");
let send_crypto = ccl.crypto(&peer).expect("peer crypto");
let recv_crypto = Crypto::new(vld0, peer_keypair, ccl.sovereign().auth_key());
let req = Request::Changes {
since_db_version: 8,
};
let enc_req = send_crypto.encode(req.clone()).expect("encode");
let dec_req = recv_crypto
.decode::<Request>(enc_req.as_slice())
.expect("decode");
assert_eq!(req, dec_req);
let req_send = Envelope {
sender: ccl.sovereign().dht_key().to_string(),
contents: send_crypto.encode(req.clone()).expect("encode"),
};
let msg = req_send.encode().expect("encode");
let req_recv = Envelope::decode(msg.as_slice()).expect("decode");
let req_decrypt = recv_crypto
.decode::<Request>(req_recv.contents.as_slice())
.expect("decode");
assert_eq!(req, req_decrypt);
ccl.close().await.expect("ok");
teardown_api(api).await;
}
}