use std::{collections::HashMap, sync::Arc, time::Duration};
use flume::{unbounded, Receiver, Sender};
use tokio::{select, signal, task::JoinHandle};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, instrument, trace, warn, Level};
use veilid_core::{CryptoKey, CryptoTyped, RoutingContext, Sequencing, VeilidUpdate};
pub mod cli;
mod db;
mod error;
mod ident;
mod proto;
pub mod veilid_config;
use db::DB;
pub use error::{other_err, warn_err, Error, Result};
use ident::{Conclave, Peer, Status};
use proto::codec::{
ChangesResponse, Decodable, Encodable, Envelope, NodeStatus, Request, Response, StatusResponse,
};
#[derive(Clone)]
pub struct DDCP {
db: DB,
routing_context: RoutingContext,
conclave: Conclave,
updates: Receiver<VeilidUpdate>,
}
static LOCAL_TRACKER_PERIOD: Duration = Duration::from_secs(60);
static REMOTE_TRACKER_PERIOD: Duration = Duration::from_secs(60);
impl DDCP {
pub async fn new(db_path: Option<&str>, state_path: &str, ext_path: &str) -> Result<DDCP> {
let db = DB::new(db_path, ext_path).await?;
let (routing_context, updates) = DDCP::new_routing_context(state_path).await?;
Ok(DDCP {
db,
conclave: Conclave::new(routing_context.clone()).await?,
routing_context,
updates,
})
}
async fn new_routing_context(
state_path: &str,
) -> Result<(RoutingContext, Receiver<VeilidUpdate>)> {
let (node_sender, updates): (
Sender<veilid_core::VeilidUpdate>,
Receiver<veilid_core::VeilidUpdate>,
) = unbounded();
let update_callback = Arc::new(move |change: veilid_core::VeilidUpdate| {
let _ = node_sender.send(change);
});
let config_state_path = Arc::new(state_path.to_owned());
let config_callback =
Arc::new(move |key| veilid_config::callback(config_state_path.to_string(), key));
let api: veilid_core::VeilidAPI =
veilid_core::api_startup(update_callback, config_callback).await?;
api.attach().await?;
let routing_context = api
.routing_context()?
.with_sequencing(Sequencing::EnsureOrdered)
.with_default_safety()?;
Ok((routing_context, updates))
}
#[instrument(skip(self), level = Level::INFO, err)]
pub async fn wait_for_network(&mut self) -> Result<()> {
loop {
let res = self.updates.recv_async().await;
match res {
Ok(VeilidUpdate::Attachment(attachment)) => {
if attachment.public_internet_ready {
info!(
state = attachment.state.to_string(),
public_internet_ready = attachment.public_internet_ready,
"Connected"
);
break;
}
info!(
state = attachment.state.to_string(),
public_internet_ready = attachment.public_internet_ready,
"Waiting for network"
);
}
Ok(u) => {
trace!(update = format!("{:?}", u));
}
Err(e) => {
return Err(Error::Other(e.to_string()));
}
};
}
loop {
match self.push().await {
Ok(_) => return Ok(()),
Err(e) => {
warn!(err = format!("{:?}", e), "failed to push status");
}
}
}
}
pub fn addr(&self) -> String {
let dht_key = self.conclave.sovereign().dht_key();
return dht_key.to_string();
}
#[instrument(skip(self), level = Level::DEBUG, ret, err)]
pub async fn push(&mut self) -> Result<(String, Vec<u8>, i64)> {
let (site_id, db_version) = self.db.status().await?;
self.conclave
.refresh(Status {
site_id: site_id.clone(),
db_version,
})
.await?;
Ok((
self.conclave.sovereign().dht_key().to_string(),
site_id,
db_version,
))
}
#[instrument(skip(self), level = Level::DEBUG, ret, err)]
pub async fn pull(&mut self, name: &str) -> Result<(bool, i64)> {
{
let peer = self
.conclave
.peer_mut(name)
.ok_or(other_err("unknown peer"))?;
peer.refresh(&self.routing_context).await?;
}
let peer = self.conclave.peer(name).ok_or(other_err("unknown peer"))?;
let node_status = peer.node_status().ok_or(other_err("peer missing status"))?;
self.pull_from(peer, &node_status).await
}
async fn pull_from(&self, peer: &Peer, peer_status: &NodeStatus) -> Result<(bool, i64)> {
let tracked_version = self
.db
.tracked_peer_version(peer_status.site_id.clone())
.await?;
if peer_status.db_version <= tracked_version {
return Ok((false, tracked_version));
}
debug!(
db_version = peer_status.db_version,
tracked_version, "pulling changes"
);
let resp = self.conclave.changes(peer, tracked_version).await?;
if resp.site_id.as_slice() != peer_status.site_id.as_slice() {
return Err(other_err("mismatched site_id"));
}
let merge_version = self.db.merge(resp.site_id, resp.changes).await?;
Ok((true, merge_version))
}
#[instrument(skip(self), level = Level::DEBUG, err)]
pub async fn cleanup(self) -> Result<()> {
let _ = warn_err(
self.conclave.close().await,
"failed to release conclave resources",
);
self.routing_context.api().shutdown().await;
self.db.close().await?;
Ok(())
}
#[instrument(skip(self), level = Level::DEBUG, err)]
pub async fn remote_add(&mut self, name: String, addr: String) -> Result<()> {
let peer = Peer::new(&self.routing_context.api(), name.as_str(), addr.as_str()).await?;
self.conclave.set_peer(peer).await?;
Ok(())
}
#[instrument(skip(self), level = Level::DEBUG, ret, err)]
pub async fn remote_remove(&mut self, name: String) -> Result<bool> {
self.conclave.remove_peer(name.as_str()).await
}
pub fn remotes(&self) -> Vec<(String, CryptoTyped<CryptoKey>)> {
self.conclave
.peers()
.map(|peer| (peer.name(), peer.dht_key()))
.collect()
}
#[instrument(skip(self), level = Level::DEBUG, err)]
pub async fn serve(&self) -> Result<()> {
info!(addr = self.addr(), "starting server");
let token = CancellationToken::new();
let puller = self.spawn_puller(token.clone());
let server = self.spawn_server(token.clone());
let interrupter = tokio::spawn(async move {
signal::ctrl_c().await?;
warn!("interrupt received");
token.cancel();
Ok::<(), Error>(())
});
info!(addr = self.addr(), "server started");
let _ = tokio::join!(puller, server, interrupter);
Ok(())
}
async fn spawn_puller(&self, token: CancellationToken) -> JoinHandle<Result<()>> {
let mut puller = self.clone();
tokio::spawn(async move {
debug!("puller task starting");
let mut timer = tokio::time::interval(REMOTE_TRACKER_PERIOD);
loop {
select! {
_ = timer.tick() => {
info!("refreshing peers");
if let Err(_) = warn_err(puller.conclave.refresh_peers().await, "refresh failed") {
continue
}
for peer in puller.conclave.peers() {
info!(name = peer.name(), "pulling from peer");
let peer_status = match peer.node_status() {
Some(status) => status,
None => {
error!("peer missing status");
continue
}
};
if let Ok(_) = warn_err(puller.pull_from(peer, &peer_status).await, "pull failed") {
info!(name = peer.name(), "pull ok");
}
}
}
_ = token.cancelled() => {
return Ok(());
}
}
}
})
}
async fn spawn_server(&self, token: CancellationToken) -> JoinHandle<Result<()>> {
let mut server = self.clone();
tokio::spawn(async move {
debug!("server task starting");
let mut timer = tokio::time::interval(LOCAL_TRACKER_PERIOD);
let peer_by_key = HashMap::from_iter(
server
.conclave
.peers()
.map(|peer| (peer.dht_key().to_string(), peer.clone())),
);
loop {
select! {
_ = timer.tick() => {
let _ = warn_err(server.push().await, "push failed");
}
res = server.updates.recv_async() => {
match res {
Ok(update) => {
let _ = warn_err(server.handle_update(&peer_by_key, update).await, "failed to handle update");
}
Err(e) => return Err(other_err(e)),
}
}
_ = token.cancelled() => {
return Ok(());
}
}
}
})
}
async fn handle_update(
&mut self,
peer_by_sender: &HashMap<String, Peer>,
update: VeilidUpdate,
) -> Result<()> {
let sender = self.conclave.sovereign().dht_key().to_string();
match update {
VeilidUpdate::AppCall(app_call) => {
let envelope = Envelope::decode(app_call.message())?;
let peer = match peer_by_sender.get(&envelope.sender) {
Some(peer) => peer.clone(),
None => {
warn!(sender = envelope.sender, "unknown peer");
return Ok(());
}
};
let responder = self.clone();
tokio::spawn(async move {
let crypto = responder.conclave.crypto(&peer)?;
let request = crypto.decode::<Request>(envelope.contents.as_slice())?;
debug!(request = format!("{:?}", request), "handling app_call");
let resp = match request {
Request::Status => {
let (site_id, db_version) = responder.db.status().await?;
crypto.encode(Response::Status(StatusResponse {
site_id,
db_version,
}))?
}
Request::Changes { since_db_version } => {
let (site_id, changes) = responder.db.changes(since_db_version).await?;
crypto
.encode(Response::Changes(ChangesResponse { site_id, changes }))?
}
};
responder
.routing_context
.api()
.app_call_reply(
app_call.id(),
Envelope {
sender: sender.to_owned(),
contents: resp,
}
.encode()?,
)
.await?;
Ok::<(), Error>(())
}); Ok(())
}
VeilidUpdate::Shutdown => Err(Error::VeilidAPI(veilid_core::VeilidAPIError::Shutdown)),
VeilidUpdate::RouteChange(_) => {
self.conclave
.sovereign_mut()
.release_route(&self.routing_context)?;
Ok(())
}
_ => Ok(()),
}
}
}
#[cfg(test)]
mod tests;