#[cfg(feature = "preview")]
use std::time::Duration;
use aranya_crypto::Rng;
#[cfg(feature = "preview")]
use aranya_runtime::{Address, Storage as _, SyncHelloType, SyncType};
use aranya_runtime::{
Command as _, PolicyStore, Sink, StorageProvider, SyncRequester, TraversalBuffers,
};
use buggy::BugExt as _;
use derive_where::derive_where;
use tokio::sync::mpsc;
use tracing::{debug, error, info, instrument, trace, warn};
use super::{
transport::{SyncConnector, SyncStream as _},
Error, GraphId, Result, SyncPeer, SyncResponse,
};
use crate::{
aranya::{Client, InvalidGraphs},
vm_policy::VecSink,
};
#[derive_where(Debug; C)]
pub(crate) struct SyncClient<C, PS, SP, EF> {
pub(super) client: Client<PS, SP>,
connector: C,
send_effects: mpsc::Sender<(GraphId, Vec<EF>)>,
#[derive_where(skip(Debug))]
traversal: TraversalBuffers,
}
impl<C, PS, SP, EF> SyncClient<C, PS, SP, EF> {
pub(crate) fn new(
client: Client<PS, SP>,
connector: C,
send_effects: mpsc::Sender<(GraphId, Vec<EF>)>,
) -> Self {
Self {
client,
connector,
send_effects,
traversal: TraversalBuffers::new(),
}
}
pub(crate) fn invalid_graphs(&self) -> &InvalidGraphs {
self.client.invalid_graphs()
}
}
impl<C, PS, SP, EF> SyncClient<C, PS, SP, EF>
where
C: SyncConnector,
{
#[cfg(feature = "preview")]
pub(super) async fn send_hello_subscribe(
&self,
peer: SyncPeer,
graph_change_debounce: Duration,
duration: Duration,
schedule_delay: Duration,
buffer: &mut [u8],
) -> Result<()> {
trace!(?peer, "subscribing to hello notifications from peer");
let message = SyncType::Hello(SyncHelloType::Subscribe {
graph_change_delay: graph_change_debounce,
duration,
schedule_delay,
graph_id: peer.graph_id,
});
self.send_hello_request(peer, message, buffer).await
}
#[cfg(feature = "preview")]
pub(super) async fn send_hello_unsubscribe(
&self,
peer: SyncPeer,
buffer: &mut [u8],
) -> Result<()> {
trace!(?peer, "unsubscribing from hello notifications from peer");
let message = SyncType::Hello(SyncHelloType::Unsubscribe {
graph_id: peer.graph_id,
});
self.send_hello_request(peer, message, buffer).await
}
#[cfg(feature = "preview")]
#[instrument(skip_all, fields(peer = %peer.addr, graph = %peer.graph_id))]
pub(super) async fn send_hello_notification(
&mut self,
peer: SyncPeer,
head: Address,
buffer: &mut [u8],
) -> Result<()> {
trace!(?peer, "sending hello notifications to peer");
let message = SyncType::Hello(SyncHelloType::Hello {
head,
graph_id: peer.graph_id,
});
let mut stream = self
.connector
.connect(peer)
.await
.map_err(Error::transport)?;
let data = postcard::to_slice(&message, buffer)?;
stream.send(data).await.map_err(Error::transport)?;
stream.finish().await.map_err(Error::transport)?;
Ok(())
}
#[cfg(feature = "preview")]
pub(super) async fn send_hello_request(
&self,
peer: SyncPeer,
sync_type: SyncType,
buffer: &mut [u8],
) -> Result<()> {
let mut stream = self
.connector
.connect(peer)
.await
.map_err(Error::transport)?;
let data = postcard::to_slice(&sync_type, buffer)?;
stream.send(data).await.map_err(Error::transport)?;
stream.finish().await.map_err(Error::transport)?;
match stream.receive(buffer).await {
Ok(0) => Err(Error::EmptyResponse),
Ok(_) => Ok(()),
Err(e) => Err(Error::transport(e)),
}
}
}
impl<C, PS, SP, EF> SyncClient<C, PS, SP, EF>
where
PS: PolicyStore,
SP: StorageProvider,
{
#[cfg(feature = "preview")]
pub(super) async fn command_exists(&mut self, graph_id: GraphId, head: Address) -> bool {
self.client
.lock_aranya()
.await
.command_exists(graph_id, head, &mut self.traversal.primary)
}
#[cfg(feature = "preview")]
pub(super) async fn get_head(&self, graph_id: GraphId) -> Option<Address> {
self.client
.lock_aranya()
.await
.provider()
.get_storage(graph_id)
.map_or(None, |storage| storage.get_head_address().ok())
}
async fn process_sync_data<S: Sink<PS::Effect>>(
&mut self,
peer: SyncPeer,
data: &[u8],
requester: &mut SyncRequester,
sink: &mut S,
) -> Result<usize> {
if data.is_empty() {
debug!(?peer, "sync response contained no data");
return Ok(0);
}
let cmds = match requester.receive(data)? {
Some(cmds) if !cmds.is_empty() => cmds,
_ => {
debug!(?peer, "sync response contained no new commands");
return Ok(0);
}
};
trace!(
?peer,
cmd_count = cmds.len(),
"processing received commands"
);
let (mut aranya, mut caches) = self.client.lock_aranya_and_caches().await;
let mut trx = aranya.transaction(peer.graph_id);
aranya.add_commands(&mut trx, sink, &cmds, &mut self.traversal.primary)?;
aranya.commit(trx, sink, &mut self.traversal.primary)?;
aranya.update_heads(
peer.graph_id,
cmds.iter().filter_map(|cmd| cmd.address().ok()),
caches.entry(peer).or_default(),
&mut self.traversal.primary,
)?;
debug!(
?peer,
cmd_count = cmds.len(),
"committed commands from sync"
);
Ok(cmds.len())
}
}
impl<C, PS, SP, EF> SyncClient<C, PS, SP, EF>
where
C: SyncConnector,
PS: PolicyStore,
SP: StorageProvider,
EF: Send + Sync + 'static + TryFrom<PS::Effect>,
EF::Error: Send + Sync + 'static + std::error::Error,
{
#[instrument(skip_all, fields(peer = %peer.addr, graph = %peer.graph_id))]
pub(crate) async fn sync(&mut self, peer: SyncPeer, buffer: &mut [u8]) -> Result<usize> {
debug!(?peer, "starting sync");
let mut stream = self.connector.connect(peer).await.map_err(|error| {
warn!(?peer, %error, "failed to connect to peer");
Error::transport(error)
})?;
let mut requester = SyncRequester::new(peer.graph_id, Rng);
let (len, _cmds) = {
let (mut aranya, mut caches) = self.client.lock_aranya_and_caches().await;
requester.poll(
buffer,
aranya.provider(),
caches.entry(peer).or_default(),
&mut self.traversal.primary,
)
}?;
let buf = buffer.get(..len).assume("valid offset")?;
trace!(?peer, request_bytes = len, "sending sync request");
stream.send(buf).await.map_err(Error::transport)?;
stream.finish().await.map_err(Error::transport)?;
let len = stream.receive(buffer).await.map_err(Error::transport)?;
trace!(?peer, response_bytes = len, "received sync response");
let buf = buffer.get(..len).assume("valid offset")?;
let resp = postcard::from_bytes(buf)?;
let data = match resp {
SyncResponse::Ok(data) => data,
SyncResponse::Err(msg) => {
error!(?peer, %msg, "peer returned sync error");
return Err(Error::Response(msg));
}
};
let mut sink = VecSink::new();
let cmd_count = self
.process_sync_data(peer, &data, &mut requester, &mut sink)
.await?;
let effects = sink
.collect()
.map_err(|e| Error::EffectsSink(Box::new(e)))?;
let effects_count = effects.len();
if let Err(error) = self.send_effects.send((peer.graph_id, effects)).await {
debug!(?error, "effect handler closed, discarding effects");
}
info!(?peer, cmd_count, effects_count, "sync completed");
Ok(cmd_count)
}
}