use capnp::any_pointer;
use capnp::capability::Promise;
use capnp::private::capability::{ClientHook, ParamsHook, PipelineHook, PipelineOp, ResultsHook};
use capnp::Error;
use futures::{Future, FutureExt, TryFutureExt};
use std::cell::RefCell;
use std::rc::{Rc, Weak};
use crate::attach::Attach;
use crate::sender_queue::SenderQueue;
use crate::{broken, local};
pub struct PipelineInner {
redirect: Option<Box<dyn PipelineHook>>,
promise_to_drive: futures::future::Shared<Promise<(), Error>>,
clients_to_resolve: SenderQueue<(Weak<RefCell<ClientInner>>, Vec<PipelineOp>), ()>,
}
impl PipelineInner {
fn resolve(this: &Rc<RefCell<Self>>, result: Result<Box<dyn PipelineHook>, Error>) {
if this.borrow().redirect.is_some() {
return;
}
let pipeline = match result {
Ok(pipeline_hook) => pipeline_hook,
Err(e) => Box::new(broken::Pipeline::new(e)),
};
this.borrow_mut().redirect = Some(pipeline.add_ref());
for ((weak_client, ops), waiter) in this.borrow_mut().clients_to_resolve.drain() {
if let Some(client) = weak_client.upgrade() {
let clienthook = pipeline.get_pipelined_cap_move(ops);
ClientInner::resolve(&client, Ok(clienthook));
}
let _ = waiter.send(());
}
this.borrow_mut().promise_to_drive = Promise::ok(()).shared();
}
}
pub struct PipelineInnerSender {
inner: Option<Weak<RefCell<PipelineInner>>>,
resolve_on_drop: bool,
}
impl PipelineInnerSender {
pub(crate) fn weak_clone(&self) -> Self {
Self {
inner: self.inner.clone(),
resolve_on_drop: false,
}
}
}
impl Drop for PipelineInnerSender {
fn drop(&mut self) {
if self.resolve_on_drop {
if let Some(weak_queued) = self.inner.take() {
if let Some(pipeline_inner) = weak_queued.upgrade() {
PipelineInner::resolve(
&pipeline_inner,
Ok(Box::new(crate::broken::Pipeline::new(Error::failed(
"PipelineInnerSender was canceled".into(),
)))),
);
}
}
}
}
}
impl PipelineInnerSender {
pub fn complete(mut self, pipeline: Box<dyn PipelineHook>) {
if let Some(weak_queued) = self.inner.take() {
if let Some(pipeline_inner) = weak_queued.upgrade() {
crate::queued::PipelineInner::resolve(&pipeline_inner, Ok(pipeline));
}
}
}
}
pub struct Pipeline {
inner: Rc<RefCell<PipelineInner>>,
}
impl Pipeline {
pub fn new() -> (PipelineInnerSender, Self) {
let inner = Rc::new(RefCell::new(PipelineInner {
redirect: None,
promise_to_drive: Promise::ok(()).shared(),
clients_to_resolve: SenderQueue::new(),
}));
(
PipelineInnerSender {
inner: Some(Rc::downgrade(&inner)),
resolve_on_drop: true,
},
Self { inner },
)
}
pub fn drive<F>(&mut self, promise: F)
where
F: Future<Output = Result<(), Error>> + 'static + Unpin,
{
let new = Promise::from_future(
futures::future::try_join(self.inner.borrow_mut().promise_to_drive.clone(), promise)
.map_ok(|_| ()),
)
.shared();
self.inner.borrow_mut().promise_to_drive = new;
}
}
impl Clone for Pipeline {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl PipelineHook for Pipeline {
fn add_ref(&self) -> Box<dyn PipelineHook> {
Box::new(self.clone())
}
fn get_pipelined_cap(&self, ops: &[PipelineOp]) -> Box<dyn ClientHook> {
self.get_pipelined_cap_move(ops.into())
}
fn get_pipelined_cap_move(&self, ops: Vec<PipelineOp>) -> Box<dyn ClientHook> {
if let Some(p) = &self.inner.borrow().redirect {
return p.get_pipelined_cap_move(ops);
}
let mut queued_client = Client::new(Some(self.inner.clone()));
queued_client.drive(self.inner.borrow().promise_to_drive.clone());
let weak_queued = Rc::downgrade(&queued_client.inner);
self.inner
.borrow_mut()
.clients_to_resolve
.push_detach((weak_queued, ops));
Box::new(queued_client)
}
}
pub struct ClientInner {
redirect: Option<Box<dyn ClientHook>>,
pipeline_inner: Option<Rc<RefCell<PipelineInner>>>,
promise_to_drive: Option<futures::future::Shared<Promise<(), Error>>>,
call_forwarding_queue:
SenderQueue<(u64, u16, Box<dyn ParamsHook>, Box<dyn ResultsHook>), Promise<(), Error>>,
client_resolution_queue: SenderQueue<(), Box<dyn ClientHook>>,
}
impl ClientInner {
pub fn resolve(state: &Rc<RefCell<Self>>, result: Result<Box<dyn ClientHook>, Error>) {
assert!(state.borrow().redirect.is_none());
let client = match result {
Ok(clienthook) => clienthook,
Err(e) => broken::new_cap(e),
};
state.borrow_mut().redirect = Some(client.add_ref());
for (args, waiter) in state.borrow_mut().call_forwarding_queue.drain() {
let (interface_id, method_id, params, results) = args;
let result_promise = client.call(interface_id, method_id, params, results);
let _ = waiter.send(result_promise);
}
for ((), waiter) in state.borrow_mut().client_resolution_queue.drain() {
let _ = waiter.send(client.add_ref());
}
state.borrow_mut().promise_to_drive.take();
state.borrow_mut().pipeline_inner.take();
}
}
pub struct Client {
pub inner: Rc<RefCell<ClientInner>>,
}
impl Client {
pub fn new(pipeline_inner: Option<Rc<RefCell<PipelineInner>>>) -> Self {
let inner = Rc::new(RefCell::new(ClientInner {
promise_to_drive: None,
pipeline_inner,
redirect: None,
call_forwarding_queue: SenderQueue::new(),
client_resolution_queue: SenderQueue::new(),
}));
Self { inner }
}
pub fn drive<F>(&mut self, promise: F)
where
F: Future<Output = Result<(), Error>> + 'static,
{
assert!(self.inner.borrow().promise_to_drive.is_none());
self.inner.borrow_mut().promise_to_drive = Some(Promise::from_future(promise).shared());
}
}
impl ClientHook for Client {
fn add_ref(&self) -> Box<dyn ClientHook> {
Box::new(Self {
inner: self.inner.clone(),
})
}
fn new_call(
&self,
interface_id: u64,
method_id: u16,
size_hint: Option<::capnp::MessageSize>,
) -> ::capnp::capability::Request<any_pointer::Owned, any_pointer::Owned> {
::capnp::capability::Request::new(Box::new(local::Request::new(
interface_id,
method_id,
size_hint,
self.add_ref(),
)))
}
fn call(
&self,
interface_id: u64,
method_id: u16,
params: Box<dyn ParamsHook>,
results: Box<dyn ResultsHook>,
) -> Promise<(), Error> {
if let Some(client) = &self.inner.borrow().redirect {
return client.call(interface_id, method_id, params, results);
}
let inner_clone = self.inner.clone();
let promise = self
.inner
.borrow_mut()
.call_forwarding_queue
.push((interface_id, method_id, params, results))
.attach(inner_clone)
.and_then(|x| x);
match self.inner.borrow().promise_to_drive {
Some(ref p) => {
let p1 = p.clone();
Promise::from_future(async move {
match futures::future::select(p1, promise).await {
futures::future::Either::Left((Ok(()), promise)) => promise.await,
futures::future::Either::Left((Err(e), _)) => Err(e),
futures::future::Either::Right((r, _)) => {
r
}
}
})
}
None => Promise::from_future(promise),
}
}
fn get_ptr(&self) -> usize {
(&*self.inner.borrow()) as *const _ as usize
}
fn get_brand(&self) -> usize {
0
}
fn get_resolved(&self) -> Option<Box<dyn ClientHook>> {
match &self.inner.borrow().redirect {
Some(inner) => Some(inner.clone()),
None => None,
}
}
fn when_more_resolved(&self) -> Option<Promise<Box<dyn ClientHook>, Error>> {
if let Some(client) = &self.inner.borrow().redirect {
return Some(Promise::ok(client.add_ref()));
}
let promise = self.inner.borrow_mut().client_resolution_queue.push(());
match &self.inner.borrow().promise_to_drive {
Some(p) => Some(Promise::from_future(
futures::future::try_join(p.clone(), promise).map_ok(|v| v.1),
)),
None => Some(Promise::from_future(promise)),
}
}
fn when_resolved(&self) -> Promise<(), Error> {
crate::rpc::default_when_resolved_impl(self)
}
}