use capnp::capability::Promise;
use capnp::private::capability::ClientHook;
use capnp::Error;
use futures::channel::oneshot;
use futures::{Future, FutureExt, TryFutureExt};
use std::cell::RefCell;
use std::pin::Pin;
use std::rc::{Rc, Weak};
use std::task::{Context, Poll};
pub use crate::rpc::Disconnector;
use crate::task_set::TaskSet;
pub use crate::reconnect::{auto_reconnect, lazy_auto_reconnect, SetTarget};
pub mod rpc_capnp;
pub mod rpc_twoparty_capnp;
#[macro_export]
macro_rules! pry {
($expr:expr) => {
match $expr {
::std::result::Result::Ok(val) => val,
::std::result::Result::Err(err) => {
return ::capnp::capability::Promise::err(::std::convert::From::from(err))
}
}
};
}
mod attach;
mod broken;
mod flow_control;
mod local;
mod queued;
mod reconnect;
mod rpc;
mod sender_queue;
mod split;
mod task_set;
pub mod twoparty;
use capnp::message;
pub trait OutgoingMessage {
fn get_body(&mut self) -> ::capnp::Result<::capnp::any_pointer::Builder>;
fn get_body_as_reader(&self) -> ::capnp::Result<::capnp::any_pointer::Reader>;
fn send(
self: Box<Self>,
) -> (
Promise<(), Error>,
Rc<message::Builder<message::HeapAllocator>>,
);
fn take(self: Box<Self>) -> ::capnp::message::Builder<::capnp::message::HeapAllocator>;
fn size_in_words(&self) -> usize;
}
pub trait IncomingMessage {
fn get_body(&self) -> ::capnp::Result<::capnp::any_pointer::Reader>;
}
pub trait Connection<VatId> {
fn get_peer_vat_id(&self) -> VatId;
fn new_outgoing_message(&mut self, first_segment_word_size: u32) -> Box<dyn OutgoingMessage>;
fn receive_incoming_message(&mut self) -> Promise<Option<Box<dyn IncomingMessage>>, Error>;
fn new_stream(&mut self) -> (Box<dyn FlowController>, Promise<(), Error>) {
let (fc, f) = crate::flow_control::FixedWindowFlowController::new(
crate::flow_control::DEFAULT_WINDOW_SIZE,
);
(Box::new(fc), f)
}
fn shutdown(&mut self, result: ::capnp::Result<()>) -> Promise<(), Error>;
}
pub trait FlowController {
fn send(
&mut self,
message: Box<dyn OutgoingMessage>,
ack: Promise<(), Error>,
) -> Promise<(), Error>;
fn wait_all_acked(&mut self) -> Promise<(), Error>;
}
pub trait VatNetwork<VatId> {
fn connect(&mut self, host_id: VatId) -> Option<Box<dyn Connection<VatId>>>;
fn accept(&mut self) -> Promise<Box<dyn Connection<VatId>>, ::capnp::Error>;
fn drive_until_shutdown(&mut self) -> Promise<(), Error>;
}
#[must_use = "futures do nothing unless polled"]
pub struct RpcSystem<VatId>
where
VatId: 'static,
{
network: Box<dyn crate::VatNetwork<VatId>>,
bootstrap_cap: Box<dyn ClientHook>,
connection_state: Rc<RefCell<Option<Rc<rpc::ConnectionState<VatId>>>>>,
tasks: TaskSet<Error>,
handle: crate::task_set::TaskSetHandle<Error>,
}
impl<VatId> RpcSystem<VatId> {
pub fn new(
mut network: Box<dyn crate::VatNetwork<VatId>>,
bootstrap: Option<::capnp::capability::Client>,
) -> Self {
let bootstrap_cap = match bootstrap {
Some(cap) => cap.hook,
None => broken::new_cap(Error::failed("no bootstrap capability".to_string())),
};
let (mut handle, tasks) = TaskSet::new(Box::new(SystemTaskReaper));
let mut handle1 = handle.clone();
handle.add(network.drive_until_shutdown().then(move |r| {
let r = match r {
Ok(()) => Ok(()),
Err(e) => {
if e.kind != ::capnp::ErrorKind::Disconnected {
Err(e)
} else {
Ok(())
}
}
};
handle1.terminate(r);
Promise::ok(())
}));
let mut result = Self {
network,
bootstrap_cap,
connection_state: Rc::new(RefCell::new(None)),
tasks,
handle: handle.clone(),
};
let accept_loop = result.accept_loop();
handle.add(accept_loop);
result
}
pub fn bootstrap<T>(&mut self, vat_id: VatId) -> T
where
T: ::capnp::capability::FromClientHook,
{
let Some(connection) = self.network.connect(vat_id) else {
return T::new(self.bootstrap_cap.clone());
};
let connection_state = Self::get_connection_state(
&self.connection_state,
self.bootstrap_cap.clone(),
connection,
self.handle.clone(),
);
let hook = rpc::ConnectionState::bootstrap(&connection_state);
T::new(hook)
}
fn accept_loop(&mut self) -> Promise<(), Error> {
let connection_state_ref = self.connection_state.clone();
let bootstrap_cap = self.bootstrap_cap.clone();
let handle = self.handle.clone();
Promise::from_future(self.network.accept().map_ok(move |connection| {
Self::get_connection_state(&connection_state_ref, bootstrap_cap, connection, handle);
}))
}
fn get_connection_state(
connection_state_ref: &Rc<RefCell<Option<Rc<rpc::ConnectionState<VatId>>>>>,
bootstrap_cap: Box<dyn ClientHook>,
connection: Box<dyn crate::Connection<VatId>>,
mut handle: crate::task_set::TaskSetHandle<Error>,
) -> Rc<rpc::ConnectionState<VatId>> {
let (tasks, result) = match *connection_state_ref.borrow() {
Some(ref connection_state) => {
return connection_state.clone();
}
None => {
let (on_disconnect_fulfiller, on_disconnect_promise) =
oneshot::channel::<Promise<(), Error>>();
let connection_state_ref1 = connection_state_ref.clone();
handle.add(on_disconnect_promise.then(move |shutdown_promise| {
*connection_state_ref1.borrow_mut() = None;
match shutdown_promise {
Ok(s) => s,
Err(e) => Promise::err(Error::failed(format!("{e}"))),
}
}));
rpc::ConnectionState::new(bootstrap_cap, connection, on_disconnect_fulfiller)
}
};
*connection_state_ref.borrow_mut() = Some(result.clone());
handle.add(tasks);
result
}
pub fn get_disconnector(&self) -> rpc::Disconnector<VatId> {
rpc::Disconnector::new(self.connection_state.clone())
}
}
impl<VatId> Future for RpcSystem<VatId>
where
VatId: 'static,
{
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Pin::new(&mut self.tasks).poll(cx)
}
}
pub fn new_client<C, S>(s: S) -> C
where
C: capnp::capability::FromServer<S>,
{
capnp::capability::FromClientHook::new(Box::new(local::Client::new(
<C as capnp::capability::FromServer<S>>::from_server(s),
)))
}
pub struct CapabilityServerSet<S, C>
where
C: capnp::capability::FromServer<S>,
{
caps: std::collections::HashMap<usize, Weak<RefCell<C::Dispatch>>>,
}
impl<S, C> Default for CapabilityServerSet<S, C>
where
C: capnp::capability::FromServer<S>,
{
fn default() -> Self {
Self {
caps: std::default::Default::default(),
}
}
}
impl<S, C> CapabilityServerSet<S, C>
where
C: capnp::capability::FromServer<S>,
{
pub fn new() -> Self {
Self::default()
}
pub fn new_client(&mut self, s: S) -> C {
let dispatch = <C as capnp::capability::FromServer<S>>::from_server(s);
let wrapped = Rc::new(RefCell::new(dispatch));
let ptr = wrapped.as_ptr() as usize;
self.caps.insert(ptr, Rc::downgrade(&wrapped));
capnp::capability::FromClientHook::new(Box::new(local::Client::from_rc(wrapped)))
}
pub async fn get_local_server(&self, client: &C) -> Option<Rc<RefCell<C::Dispatch>>>
where
C: capnp::capability::FromClientHook,
{
let resolved: C = capnp::capability::get_resolved_cap(
capnp::capability::FromClientHook::new(client.as_client_hook().add_ref()),
)
.await;
let hook = resolved.into_client_hook();
let ptr = hook.get_ptr();
self.caps.get(&ptr).and_then(|c| c.upgrade())
}
pub fn get_local_server_of_resolved(&self, client: &C) -> Option<Rc<RefCell<C::Dispatch>>>
where
C: capnp::capability::FromClientHook,
{
let hook = client.as_client_hook();
let ptr = hook.get_ptr();
self.caps.get(&ptr).and_then(|c| c.upgrade())
}
pub fn gc(&mut self) {
self.caps.retain(|_, c| c.strong_count() > 0);
}
}
pub fn new_future_client<T>(
client_future: impl ::futures::Future<Output = Result<T, Error>> + 'static,
) -> T
where
T: ::capnp::capability::FromClientHook,
{
let mut queued_client = crate::queued::Client::new(None);
let weak_client = Rc::downgrade(&queued_client.inner);
queued_client.drive(client_future.then(move |r| {
if let Some(queued_inner) = weak_client.upgrade() {
crate::queued::ClientInner::resolve(&queued_inner, r.map(|c| c.into_client_hook()));
}
Promise::ok(())
}));
T::new(Box::new(queued_client))
}
struct SystemTaskReaper;
impl crate::task_set::TaskReaper<Error> for SystemTaskReaper {
fn task_failed(&mut self, error: Error) {
println!("ERROR: {error}");
}
}
pub struct ImbuedMessageBuilder<A>
where
A: ::capnp::message::Allocator,
{
builder: ::capnp::message::Builder<A>,
cap_table: Vec<Option<Box<dyn (::capnp::private::capability::ClientHook)>>>,
}
impl<A> ImbuedMessageBuilder<A>
where
A: ::capnp::message::Allocator,
{
pub fn new(allocator: A) -> Self {
Self {
builder: ::capnp::message::Builder::new(allocator),
cap_table: Vec::new(),
}
}
pub fn get_root<'a, T>(&'a mut self) -> ::capnp::Result<T>
where
T: ::capnp::traits::FromPointerBuilder<'a>,
{
use capnp::traits::ImbueMut;
let mut root: ::capnp::any_pointer::Builder = self.builder.get_root()?;
root.imbue_mut(&mut self.cap_table);
root.get_as()
}
pub fn set_root<T: ::capnp::traits::Owned>(
&mut self,
value: impl ::capnp::traits::SetterInput<T>,
) -> ::capnp::Result<()> {
use capnp::traits::ImbueMut;
let mut root: ::capnp::any_pointer::Builder = self.builder.get_root()?;
root.imbue_mut(&mut self.cap_table);
root.set_as(value)
}
}
fn canceled_to_error(_e: futures::channel::oneshot::Canceled) -> Error {
Error::failed("oneshot was canceled".to_string())
}