use crate::{
replication::{self},
Database, Store, SyncBoxFuture,
};
use crate::{KeyVer, Lang};
use async_std::{
sync::{Arc, Mutex},
task,
};
use futures::{
channel::mpsc::{self, UnboundedSender},
stream::FuturesUnordered,
StreamExt,
};
use serde::{Deserialize, Serialize};
use tracing::{debug, debug_span, trace, warn, Instrument};
#[derive(Clone, Copy, Default, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct PeerID(pub u64);
impl std::fmt::Debug for PeerID {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("#{}", self.0))
}
}
type Conn<L> = edelcrantz::Connection<OneWay<L>, Req<L>, Res<L>>;
type Queue<L> = edelcrantz::Queue<OneWay<L>, Req<L>, Res<L>>;
pub(crate) type Connection<L> = (Arc<Mutex<Conn<L>>>, Queue<L>);
pub(crate) type ResponseFuture<L> = SyncBoxFuture<(PeerID, Result<Res<L>, edelcrantz::Error>)>;
pub(crate) type ResponseFutures<L> = FuturesUnordered<ResponseFuture<L>>;
#[serde(bound = "")]
#[derive(Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub(crate) enum Req<L: Lang> {
WriteReq(replication::WriteRequest<L>),
GetReq(replication::GetRequest<L>),
}
#[serde(bound = "")]
#[derive(Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub(crate) enum Res<L: Lang> {
WriteRes(replication::WriteResponse),
GetRes(replication::GetResponse<L>),
}
#[serde(bound = "")]
#[derive(Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub(crate) enum OneWay<L: Lang> {
LatticeAgreementMsg(crate::agreement::Msg),
PutMsg(replication::PutOneWay<L>),
}
impl<L: Lang, S: Store<L>> Database<L, S> {
pub async fn connect<IO: edelcrantz::AsyncReadWrite>(&self, peer: PeerID, io: IO) {
debug!(
"connecting peers self={:?} to other={:?}",
self.self_id, peer
);
let conn = edelcrantz::Connection::new(io);
let queue = conn.queue.clone();
let pair = (Arc::new(Mutex::new(conn)), queue);
let mut net_guard = self.connections.write().await;
net_guard.insert(peer, pair);
}
pub(crate) fn serve_req(&self, req: Req<L>) -> SyncBoxFuture<Res<L>> {
let this = self.clone();
Box::pin(async move { this.serve_req_(req).await })
}
pub(crate) async fn serve_req_(&self, req: Req<L>) -> Res<L> {
match req {
Req::WriteReq(wr) => {
trace!("write request {:?}", wr);
let res = self.serve_write(wr).await;
trace!("write response {:?}", res);
Res::WriteRes(res)
}
Req::GetReq(gr) => {
trace!("get request {:?}", gr);
let res = self.serve_get(gr).await;
trace!("get response {:?}", res);
Res::GetRes(res)
}
}
}
pub(crate) fn serve_oneway(
&self,
remote: PeerID,
agreement_send: UnboundedSender<crate::agreement::Msg>,
putmsg_send: UnboundedSender<(PeerID, crate::replication::PutOneWay<L>)>,
ow: OneWay<L>,
) {
match ow {
OneWay::LatticeAgreementMsg(msg) => {
trace!("agreement msg {:?}", msg);
agreement_send.unbounded_send(msg).unwrap();
}
OneWay::PutMsg(put) => {
trace!("put msg {:?}", put);
putmsg_send.unbounded_send((remote, put)).unwrap();
}
}
}
pub fn launch_workers(&self) -> SyncBoxFuture<()> {
let this: Database<L, S> = self.clone();
let fut = async move {
let (agreement_send, agreement_recv) = mpsc::unbounded();
let (putmsg_send, mut putmsg_recv) = mpsc::unbounded();
task::spawn({
let this = this.clone();
let span = debug_span!("put", peer=?this.self_id);
(async move {
let mut p: usize = 0;
while let Some((remote, msg)) = putmsg_recv.next().await {
p += 1;
let this = this.clone();
let span = debug_span!("put-rpc-task", peer=?this.self_id, ?remote, ?p);
task::spawn(async move { this.serve_put(msg).await }.instrument(span));
}
})
.instrument(span)
});
task::spawn({
let this = this.clone();
let span = debug_span!("agreement", peer=?this.self_id);
async move {
this.run_proposal_loop(agreement_recv)
.instrument(span)
.await;
}
});
for (id, conn) in this.connections.read().await.iter() {
let id = id.clone();
let conn = conn.0.clone();
let a_send = agreement_send.clone();
let p_send = putmsg_send.clone();
task::spawn({
let this = this.clone();
async move {
let mut n: usize = 0;
loop {
n += 1;
let sreq = {
let this = this.clone();
let span =
debug_span!("rpc-task", peer=?this.self_id, remote=?id, ?n);
move |req| {
task::spawn(
async move { this.serve_req(req).await }.instrument(span),
)
}
};
let sow = {
let this = this.clone();
let a_send = a_send.clone();
let p_send = p_send.clone();
move |ow| this.serve_oneway(id, a_send, p_send, ow)
};
let mut guard = conn.lock().await;
trace!(
"service worker task on {:?} talking to {:?} advancing (step {:?})",
this.self_id,
id,
n
);
let span = debug_span!("service", peer=?this.self_id, remote=?id);
let res = guard.advance(sreq, sow).instrument(span).await;
trace!(
"service worker task on {:?} talking to {:?} advanced (step {:?}), got result {:?}",
this.self_id,
id,
n,
res
);
match res {
Ok(()) => (),
Err(edelcrantz::Error::ResponseChannelDropped(_)) => (),
Err(_) => break,
}
}
warn!(
"service worker task on {:?} talking to {:?} exited",
this.self_id, id
);
}
});
}
};
Box::pin(fut)
}
pub(crate) async fn send_write_to_all(
&self,
kv: KeyVer<L>,
e: L::Expr,
vals: Vec<L::Val>,
) -> ResponseFutures<L> {
let req = Req::WriteReq(replication::WriteRequest::Write(kv, e, vals));
self.send_req_to_all(req).await
}
pub(crate) async fn send_abort_to_all(&self, kv: KeyVer<L>) -> ResponseFutures<L> {
let req = Req::WriteReq(replication::WriteRequest::Abort(kv));
self.send_req_to_all(req).await
}
pub(crate) async fn send_finalize_to_peers(
&self,
kv: KeyVer<L>,
peers: Vec<PeerID>,
) -> ResponseFutures<L> {
let req = Req::WriteReq(replication::WriteRequest::Finalize(kv));
self.send_req_to_peers(req, peers).await
}
pub(crate) async fn send_get_to_all(&self, kv: KeyVer<L>) -> ResponseFutures<L> {
let req = Req::GetReq(replication::GetRequest(kv));
self.send_req_to_all(req).await
}
pub(crate) fn send_req_to_self(&self, req: Req<L>) -> ResponseFuture<L> {
let this = self.clone();
Box::pin(async move { (this.self_id, Ok(this.serve_req(req).await)) })
}
pub(crate) async fn send_req_to_all(&self, req: Req<L>) -> ResponseFutures<L> {
let mut peers: Vec<PeerID> = self.connections.read().await.keys().cloned().collect();
peers.push(self.self_id);
self.send_req_to_peers(req, peers).await
}
pub(crate) async fn send_req_to_peer(&self, req: Req<L>, peer: PeerID) -> ResponseFuture<L> {
if peer == self.self_id {
self.send_req_to_self(req.clone())
} else {
match self.connections.read().await.get(&peer.clone()) {
None => Box::pin({
let peer = peer.clone();
async move { (peer, Err(edelcrantz::Error::Queue)) }
}),
Some((_, queue)) => {
let fut = queue.enqueue_request(req.clone());
Box::pin(async move { (peer, fut.await) })
}
}
}
}
pub(crate) async fn send_req_to_peers(
&self,
req: Req<L>,
peers: Vec<PeerID>,
) -> ResponseFutures<L> {
trace!("broadcasting request {:?} to peers {:?}", req, peers);
let futs: ResponseFutures<L> = FuturesUnordered::new();
for peer in peers {
futs.push(self.send_req_to_peer(req.clone(), peer).await);
}
futs
}
}