#[cfg(feature = "preview")]
use std::time::Duration;
use aranya_daemon_api::SyncPeerConfig;
#[cfg(feature = "preview")]
use aranya_runtime::Address;
use tokio::sync::{mpsc, oneshot};
use tracing::trace;
#[cfg(feature = "preview")]
use super::GraphId;
use super::{Error, Result, SyncPeer};
#[derive(Clone, Debug)]
pub(crate) enum ManagerMessage {
AddPeer {
peer: SyncPeer,
cfg: SyncPeerConfig,
},
RemovePeer {
peer: SyncPeer,
},
SyncNow {
peer: SyncPeer,
cfg: Option<SyncPeerConfig>,
},
#[cfg(feature = "preview")]
HelloSubscribe {
peer: SyncPeer,
graph_change_debounce: Duration,
duration: Duration,
schedule_delay: Duration,
},
#[cfg(feature = "preview")]
HelloUnsubscribe {
peer: SyncPeer,
},
#[cfg(feature = "preview")]
BroadcastHello {
graph_id: GraphId,
head: Address,
},
#[cfg(feature = "preview")]
HelloSubscribeRequest {
peer: SyncPeer,
graph_change_debounce: Duration,
duration: Duration,
schedule_delay: Duration,
},
#[cfg(feature = "preview")]
HelloUnsubscribeRequest {
peer: SyncPeer,
},
#[cfg(feature = "preview")]
SyncOnHello {
peer: SyncPeer,
head: Address,
},
}
#[derive(Clone, Debug)]
pub(crate) struct SyncHandle {
sender: mpsc::Sender<Callback>,
}
impl SyncHandle {
pub(crate) fn channel(buffer: usize) -> (Self, mpsc::Receiver<Callback>) {
let (tx, rx) = mpsc::channel(buffer);
(Self { sender: tx }, rx)
}
pub(crate) async fn add_peer(&self, peer: SyncPeer, cfg: SyncPeerConfig) -> Response {
self.send(ManagerMessage::AddPeer { peer, cfg }).await
}
pub(crate) async fn remove_peer(&self, peer: SyncPeer) -> Response {
self.send(ManagerMessage::RemovePeer { peer }).await
}
pub(crate) async fn sync_now(&self, peer: SyncPeer, cfg: Option<SyncPeerConfig>) -> Response {
self.send(ManagerMessage::SyncNow { peer, cfg }).await
}
#[cfg(feature = "preview")]
pub(crate) async fn sync_hello_subscribe(
&self,
peer: SyncPeer,
graph_change_debounce: Duration,
duration: Duration,
schedule_delay: Duration,
) -> Response {
self.send(ManagerMessage::HelloSubscribe {
peer,
graph_change_debounce,
duration,
schedule_delay,
})
.await
}
#[cfg(feature = "preview")]
pub(crate) async fn sync_hello_unsubscribe(&self, peer: SyncPeer) -> Response {
self.send(ManagerMessage::HelloUnsubscribe { peer }).await
}
#[cfg(feature = "preview")]
pub(crate) async fn broadcast_hello(&self, graph_id: GraphId, head: Address) -> Response {
self.send(ManagerMessage::BroadcastHello { graph_id, head })
.await
}
#[cfg(feature = "preview")]
pub(super) async fn hello_subscribe_request(
&self,
peer: SyncPeer,
graph_change_debounce: Duration,
duration: Duration,
schedule_delay: Duration,
) -> Response {
self.send(ManagerMessage::HelloSubscribeRequest {
peer,
graph_change_debounce,
duration,
schedule_delay,
})
.await
}
#[cfg(feature = "preview")]
pub(super) async fn hello_unsubscribe_request(&self, peer: SyncPeer) -> Response {
self.send(ManagerMessage::HelloUnsubscribeRequest { peer })
.await
}
#[cfg(feature = "preview")]
pub(super) async fn sync_on_hello(&self, peer: SyncPeer, head: Address) -> Response {
self.send(ManagerMessage::SyncOnHello { peer, head }).await
}
async fn send(&self, msg: ManagerMessage) -> Response {
trace!(?msg, "sending message to sync manager");
let (tx, rx) = oneshot::channel();
self.sender
.send((msg, tx))
.await
.map_err(|_| Error::SyncerShutdown)?;
rx.await.map_err(|_| Error::SyncerShutdown)?
}
}
pub(crate) type Callback = (ManagerMessage, oneshot::Sender<Response>);
type Response = Result<()>;