capnp_rpc/
rpc.rs

1// Copyright (c) 2013-2015 Sandstorm Development Group, Inc. and contributors
2// Licensed under the MIT License:
3//
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10//
11// The above copyright notice and this permission notice shall be included in
12// all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20// THE SOFTWARE.
21
22use std::pin::Pin;
23use std::task::{Context, Poll};
24
25use capnp::any_pointer;
26use capnp::capability::Promise;
27use capnp::private::capability::{
28    ClientHook, ParamsHook, PipelineHook, PipelineOp, RequestHook, ResponseHook, ResultsHook,
29};
30use capnp::Error;
31
32use futures::channel::oneshot;
33use futures::{future, Future, FutureExt, TryFutureExt};
34
35use std::cell::{Cell, RefCell};
36use std::cmp::Reverse;
37use std::collections::binary_heap::BinaryHeap;
38use std::collections::hash_map::{self, HashMap};
39use std::mem;
40use std::rc::{Rc, Weak};
41
42use crate::attach::Attach;
43use crate::local::ResultsDoneHook;
44use crate::rpc_capnp::{
45    bootstrap, call, cap_descriptor, disembargo, exception, finish, message, message_target,
46    payload, promised_answer, resolve, return_,
47};
48use crate::task_set::TaskSet;
49use crate::{broken, local, queued};
50
51pub type QuestionId = u32;
52pub type AnswerId = QuestionId;
53pub type ExportId = u32;
54pub type ImportId = ExportId;
55
56pub struct ImportTable<T> {
57    slots: HashMap<u32, T>,
58}
59
60impl<T> ImportTable<T> {
61    pub fn new() -> Self {
62        Self {
63            slots: HashMap::new(),
64        }
65    }
66}
67
68struct ExportTable<T> {
69    slots: Vec<Option<T>>,
70
71    // prioritize lower values
72    free_ids: BinaryHeap<Reverse<u32>>,
73}
74
75struct ExportTableIter<'a, T>
76where
77    T: 'a,
78{
79    table: &'a ExportTable<T>,
80    idx: usize,
81}
82
83impl<'a, T> ::std::iter::Iterator for ExportTableIter<'a, T>
84where
85    T: 'a,
86{
87    type Item = &'a T;
88    fn next(&mut self) -> Option<&'a T> {
89        while self.idx < self.table.slots.len() {
90            let idx = self.idx;
91            self.idx += 1;
92            if let Some(v) = &self.table.slots[idx] {
93                return Some(v);
94            }
95        }
96        None
97    }
98}
99
100impl<T> ExportTable<T> {
101    pub fn new() -> Self {
102        Self {
103            slots: Vec::new(),
104            free_ids: BinaryHeap::new(),
105        }
106    }
107
108    pub fn erase(&mut self, id: u32) {
109        self.slots[id as usize] = None;
110        self.free_ids.push(Reverse(id));
111    }
112
113    pub fn push(&mut self, val: T) -> u32 {
114        match self.free_ids.pop() {
115            Some(Reverse(id)) => {
116                self.slots[id as usize] = Some(val);
117                id
118            }
119            None => {
120                self.slots.push(Some(val));
121                self.slots.len() as u32 - 1
122            }
123        }
124    }
125
126    pub fn find(&mut self, id: u32) -> Option<&mut T> {
127        let idx = id as usize;
128        if idx < self.slots.len() {
129            self.slots[idx].as_mut()
130        } else {
131            None
132        }
133    }
134
135    pub fn iter(&self) -> ExportTableIter<'_, T> {
136        ExportTableIter {
137            table: self,
138            idx: 0,
139        }
140    }
141}
142
143struct Question<VatId>
144where
145    VatId: 'static,
146{
147    is_awaiting_return: bool,
148
149    #[allow(dead_code)]
150    param_exports: Vec<ExportId>,
151
152    #[allow(dead_code)]
153    is_tail_call: bool,
154
155    /// The local QuestionRef, set to None when it is destroyed.
156    self_ref: Option<Weak<RefCell<QuestionRef<VatId>>>>,
157
158    /// If true, don't send a Finish message.
159    skip_finish: bool,
160}
161
162impl<VatId> Question<VatId> {
163    fn new() -> Self {
164        Self {
165            is_awaiting_return: true,
166            param_exports: Vec::new(),
167            is_tail_call: false,
168            self_ref: None,
169            skip_finish: false,
170        }
171    }
172}
173
174/// A reference to an entry on the question table.  Used to detect when the `Finish` message
175/// can be sent.
176struct QuestionRef<VatId>
177where
178    VatId: 'static,
179{
180    connection_state: Rc<ConnectionState<VatId>>,
181    id: QuestionId,
182    fulfiller: Option<oneshot::Sender<Promise<Response<VatId>, Error>>>,
183}
184
185impl<VatId> QuestionRef<VatId> {
186    fn new(
187        state: Rc<ConnectionState<VatId>>,
188        id: QuestionId,
189        fulfiller: oneshot::Sender<Promise<Response<VatId>, Error>>,
190    ) -> Self {
191        Self {
192            connection_state: state,
193            id,
194            fulfiller: Some(fulfiller),
195        }
196    }
197    fn fulfill(&mut self, response: Promise<Response<VatId>, Error>) {
198        if let Some(fulfiller) = self.fulfiller.take() {
199            let _ = fulfiller.send(response);
200        }
201    }
202
203    fn reject(&mut self, err: Error) {
204        if let Some(fulfiller) = self.fulfiller.take() {
205            let _ = fulfiller.send(Promise::err(err));
206        }
207    }
208}
209
210impl<VatId> Drop for QuestionRef<VatId> {
211    fn drop(&mut self) {
212        let mut questions = self.connection_state.questions.borrow_mut();
213        let Some(q) = &mut questions.slots[self.id as usize] else {
214            unreachable!()
215        };
216        if let Ok(ref mut c) = *self.connection_state.connection.borrow_mut() {
217            if !q.skip_finish {
218                let mut message = c.new_outgoing_message(5);
219                {
220                    let root: message::Builder = message.get_body().unwrap().init_as();
221                    let mut builder = root.init_finish();
222                    builder.set_question_id(self.id);
223
224                    // If we're still awaiting a return, then this request is being
225                    // canceled, and we're going to ignore any capabilities in the return
226                    // message, so set releaseResultCaps true. If we already received the
227                    // return, then we've already built local proxies for the caps and will
228                    // send Release messages when those are destroyed.
229                    builder.set_release_result_caps(q.is_awaiting_return);
230                }
231                let _ = message.send();
232            }
233        }
234
235        if q.is_awaiting_return {
236            // Still waiting for return, so just remove the QuestionRef pointer from the table.
237            q.self_ref = None;
238        } else {
239            // Call has already returned, so we can now remove it from the table.
240            questions.erase(self.id)
241        }
242    }
243}
244
245struct Answer<VatId>
246where
247    VatId: 'static,
248{
249    return_has_been_sent: bool,
250
251    // Send pipelined calls here.  Becomes null as soon as a `Finish` is received.
252    pipeline: Option<Box<dyn PipelineHook>>,
253
254    // For locally-redirected calls (Call.sendResultsTo.yourself), this is a promise for the call
255    // result, to be picked up by a subsequent `Return`.
256    redirected_results: Option<Promise<Response<VatId>, Error>>,
257
258    received_finish: Rc<Cell<bool>>,
259    call_completion_promise: Option<Promise<(), Error>>,
260
261    // List of exports that were sent in the results.  If the finish has `releaseResultCaps` these
262    // will need to be released.
263    result_exports: Vec<ExportId>,
264}
265
266impl<VatId> Answer<VatId> {
267    fn new() -> Self {
268        Self {
269            return_has_been_sent: false,
270            pipeline: None,
271            redirected_results: None,
272            received_finish: Rc::new(Cell::new(false)),
273            call_completion_promise: None,
274            result_exports: Vec::new(),
275        }
276    }
277}
278
279pub struct Export {
280    refcount: u32,
281
282    /// If true, this is the canonical export entry for this clientHook, that is,
283    /// `exports_by_cap[clientHook]` points to this entry.
284    canonical: bool,
285
286    client_hook: Box<dyn ClientHook>,
287
288    // If this export is a promise (not a settled capability), the `resolve_op` represents the
289    // ongoing operation to wait for that promise to resolve and then send a `Resolve` message.
290    resolve_op: Promise<(), Error>,
291}
292
293impl Export {
294    fn new(client_hook: Box<dyn ClientHook>) -> Self {
295        Self {
296            refcount: 1,
297            canonical: false,
298            client_hook,
299            resolve_op: Promise::err(Error::failed("no resolve op".to_string())),
300        }
301    }
302}
303
304pub struct Import<VatId>
305where
306    VatId: 'static,
307{
308    import_client: Weak<RefCell<ImportClient<VatId>>>,
309
310    // Either a copy of importClient, or, in the case of promises, the wrapping PromiseClient.
311    // Becomes null when it is discarded *or* when the import is destroyed (e.g. the promise is
312    // resolved and the import is no longer needed).
313    app_client: Option<WeakClient<VatId>>,
314
315    // If non-null, the import is a promise.
316    promise_client_to_resolve: Option<Weak<RefCell<PromiseClient<VatId>>>>,
317}
318
319impl<VatId> Import<VatId> {
320    fn new(import_client: &Rc<RefCell<ImportClient<VatId>>>) -> Self {
321        Self {
322            import_client: Rc::downgrade(import_client),
323            app_client: None,
324            promise_client_to_resolve: None,
325        }
326    }
327}
328
329struct Embargo {
330    fulfiller: Option<oneshot::Sender<Result<(), Error>>>,
331}
332
333impl Embargo {
334    fn new(fulfiller: oneshot::Sender<Result<(), Error>>) -> Self {
335        Self {
336            fulfiller: Some(fulfiller),
337        }
338    }
339}
340
341fn to_pipeline_ops(
342    ops: ::capnp::struct_list::Reader<promised_answer::op::Owned>,
343) -> ::capnp::Result<Vec<PipelineOp>> {
344    let mut result = Vec::new();
345    for op in ops {
346        match op.which()? {
347            promised_answer::op::Noop(()) => {
348                result.push(PipelineOp::Noop);
349            }
350            promised_answer::op::GetPointerField(idx) => {
351                result.push(PipelineOp::GetPointerField(idx));
352            }
353        }
354    }
355    Ok(result)
356}
357
358fn from_error(error: &Error, mut builder: exception::Builder) {
359    let typ = match error.kind {
360        ::capnp::ErrorKind::Failed => exception::Type::Failed,
361        ::capnp::ErrorKind::Overloaded => exception::Type::Overloaded,
362        ::capnp::ErrorKind::Disconnected => exception::Type::Disconnected,
363        ::capnp::ErrorKind::Unimplemented => exception::Type::Unimplemented,
364        ::capnp::ErrorKind::SettingDynamicCapabilitiesIsUnsupported => {
365            exception::Type::Unimplemented
366        }
367        _ => exception::Type::Failed,
368    };
369    builder.set_type(typ);
370    match error.kind {
371        ::capnp::ErrorKind::Failed
372        | ::capnp::ErrorKind::Overloaded
373        | ::capnp::ErrorKind::Disconnected
374        | ::capnp::ErrorKind::Unimplemented => {
375            builder.set_reason(&error.extra);
376        }
377        _ => {
378            // There is extra information in `error.kind` that is not
379            // captured by `typ`. We call `error.to_string()` to allow that
380            // information to be recorded in the `reason` field.
381            builder.set_reason(error.to_string());
382        }
383    }
384}
385
386fn remote_exception_to_error(exception: exception::Reader) -> Error {
387    let (kind, reason) = match (exception.get_type(), exception.get_reason()) {
388        (Ok(exception::Type::Failed), Ok(reason)) => (::capnp::ErrorKind::Failed, reason),
389        (Ok(exception::Type::Overloaded), Ok(reason)) => (::capnp::ErrorKind::Overloaded, reason),
390        (Ok(exception::Type::Disconnected), Ok(reason)) => {
391            (::capnp::ErrorKind::Disconnected, reason)
392        }
393        (Ok(exception::Type::Unimplemented), Ok(reason)) => {
394            (::capnp::ErrorKind::Unimplemented, reason)
395        }
396        _ => (::capnp::ErrorKind::Failed, "(malformed error)".into()),
397    };
398    let reason_str = reason
399        .to_str()
400        .unwrap_or("<malformed utf-8 in error reason>");
401    Error {
402        extra: format!("remote exception: {reason_str}"),
403        kind,
404    }
405}
406
407pub struct ConnectionErrorHandler<VatId>
408where
409    VatId: 'static,
410{
411    weak_state: Weak<ConnectionState<VatId>>,
412}
413
414impl<VatId> ConnectionErrorHandler<VatId> {
415    fn new(weak_state: Weak<ConnectionState<VatId>>) -> Self {
416        Self { weak_state }
417    }
418}
419
420impl<VatId> crate::task_set::TaskReaper<capnp::Error> for ConnectionErrorHandler<VatId> {
421    fn task_failed(&mut self, error: ::capnp::Error) {
422        if let Some(state) = self.weak_state.upgrade() {
423            state.disconnect(error)
424        }
425    }
426}
427
428pub struct ConnectionState<VatId>
429where
430    VatId: 'static,
431{
432    bootstrap_cap: Box<dyn ClientHook>,
433    exports: RefCell<ExportTable<Export>>,
434    questions: RefCell<ExportTable<Question<VatId>>>,
435    answers: RefCell<ImportTable<Answer<VatId>>>,
436    imports: RefCell<ImportTable<Import<VatId>>>,
437
438    /// Exports keyed by ClientHook::get_ptr().
439    exports_by_cap: RefCell<HashMap<usize, ExportId>>,
440
441    embargoes: RefCell<ExportTable<Embargo>>,
442
443    tasks: RefCell<Option<crate::task_set::TaskSetHandle<capnp::Error>>>,
444    connection: RefCell<::std::result::Result<Box<dyn crate::Connection<VatId>>, ::capnp::Error>>,
445    disconnect_fulfiller: RefCell<Option<oneshot::Sender<Promise<(), Error>>>>,
446
447    client_downcast_map: RefCell<HashMap<usize, WeakClient<VatId>>>,
448}
449
450impl<VatId> ConnectionState<VatId> {
451    pub fn new(
452        bootstrap_cap: Box<dyn ClientHook>,
453        connection: Box<dyn crate::Connection<VatId>>,
454        disconnect_fulfiller: oneshot::Sender<Promise<(), Error>>,
455    ) -> (TaskSet<Error>, Rc<Self>) {
456        let state = Rc::new(Self {
457            bootstrap_cap,
458            exports: RefCell::new(ExportTable::new()),
459            questions: RefCell::new(ExportTable::new()),
460            answers: RefCell::new(ImportTable::new()),
461            imports: RefCell::new(ImportTable::new()),
462            exports_by_cap: RefCell::new(HashMap::new()),
463            embargoes: RefCell::new(ExportTable::new()),
464            tasks: RefCell::new(None),
465            connection: RefCell::new(Ok(connection)),
466            disconnect_fulfiller: RefCell::new(Some(disconnect_fulfiller)),
467            client_downcast_map: RefCell::new(HashMap::new()),
468        });
469        let (mut handle, tasks) =
470            TaskSet::new(Box::new(ConnectionErrorHandler::new(Rc::downgrade(&state))));
471
472        handle.add(Self::message_loop(Rc::downgrade(&state)));
473        *state.tasks.borrow_mut() = Some(handle);
474        (tasks, state)
475    }
476
477    fn new_outgoing_message(
478        &self,
479        first_segment_words: u32,
480    ) -> capnp::Result<Box<dyn crate::OutgoingMessage>> {
481        match self.connection.borrow_mut().as_mut() {
482            Err(e) => Err(e.clone()),
483            Ok(c) => Ok(c.new_outgoing_message(first_segment_words)),
484        }
485    }
486
487    fn disconnect(&self, error: ::capnp::Error) {
488        if self.connection.borrow().is_err() {
489            // Already disconnected.
490            return;
491        }
492
493        // Carefully pull all the objects out of the tables prior to releasing them because their
494        // destructors could come back and mess with the tables.
495        let mut pipelines_to_release = Vec::new();
496        let mut clients_to_release = Vec::new();
497        //let mut tail_calls_to_release = Vec::new();
498        let mut resolve_ops_to_release = Vec::new();
499
500        for q in self.questions.borrow().iter() {
501            if let Some(ref weak_question_ref) = q.self_ref {
502                if let Some(question_ref) = weak_question_ref.upgrade() {
503                    question_ref.borrow_mut().reject(error.clone());
504                }
505            }
506        }
507
508        {
509            let answer_slots = &mut self.answers.borrow_mut().slots;
510            for (_, ref mut answer) in answer_slots.iter_mut() {
511                // TODO tail call
512                pipelines_to_release.push(answer.pipeline.take())
513            }
514        }
515
516        let len = self.exports.borrow().slots.len();
517        for idx in 0..len {
518            if let Some(exp) = self.exports.borrow_mut().slots[idx].take() {
519                let Export {
520                    client_hook,
521                    resolve_op,
522                    ..
523                } = exp;
524                clients_to_release.push(client_hook);
525                resolve_ops_to_release.push(resolve_op);
526            }
527        }
528        *self.exports.borrow_mut() = ExportTable::new();
529
530        {
531            let import_slots = &mut self.imports.borrow_mut().slots;
532            for (_, ref mut import) in import_slots.iter_mut() {
533                if let Some(f) = import.promise_client_to_resolve.take() {
534                    if let Some(promise_client) = f.upgrade() {
535                        promise_client.borrow_mut().resolve(Err(error.clone()));
536                    }
537                }
538            }
539        }
540
541        let len = self.embargoes.borrow().slots.len();
542        for idx in 0..len {
543            if let Some(ref mut emb) = self.embargoes.borrow_mut().slots[idx] {
544                if let Some(f) = emb.fulfiller.take() {
545                    let _ = f.send(Err(error.clone()));
546                }
547            }
548        }
549        *self.embargoes.borrow_mut() = ExportTable::new();
550
551        drop(pipelines_to_release);
552        drop(clients_to_release);
553        drop(resolve_ops_to_release);
554        // TODO drop tail calls
555
556        match *self.connection.borrow_mut() {
557            Ok(ref mut c) => {
558                let mut message = c.new_outgoing_message(100); // TODO estimate size
559                {
560                    let builder = message
561                        .get_body()
562                        .unwrap()
563                        .init_as::<message::Builder>()
564                        .init_abort();
565                    from_error(&error, builder);
566                }
567                let _ = message.send();
568            }
569            Err(_) => unreachable!(),
570        }
571
572        let connection = mem::replace(&mut *self.connection.borrow_mut(), Err(error.clone()));
573
574        let Ok(mut c) = connection else {
575            unreachable!()
576        };
577        let promise = c.shutdown(Err(error)).then(|r| match r {
578            Ok(()) => Promise::ok(()),
579            Err(e) => {
580                if e.kind != ::capnp::ErrorKind::Disconnected {
581                    // Don't report disconnects as an error.
582                    Promise::err(e)
583                } else {
584                    Promise::ok(())
585                }
586            }
587        });
588        let Some(fulfiller) = self.disconnect_fulfiller.borrow_mut().take() else {
589            unreachable!()
590        };
591        let _ = fulfiller.send(Promise::from_future(promise.attach(c)));
592    }
593
594    // Transform a future into a promise that gets executed even if it is never polled.
595    // Dropping the returned promise cancels the computation.
596    fn eagerly_evaluate<T, F>(&self, task: F) -> Promise<T, Error>
597    where
598        F: Future<Output = Result<T, Error>> + 'static + Unpin,
599        T: 'static,
600    {
601        let (tx, rx) = oneshot::channel::<Result<T, Error>>();
602        let (tx2, rx2) = oneshot::channel::<()>();
603        let f1 = Box::pin(task.map(move |r| {
604            let _ = tx.send(r);
605        })) as Pin<Box<dyn Future<Output = ()> + Unpin>>;
606        let f2 = Box::pin(rx2.map(drop)) as Pin<Box<dyn Future<Output = ()> + Unpin>>;
607
608        self.add_task(future::select(f1, f2).map(|_| Ok(())));
609        Promise::from_future(rx.map_err(crate::canceled_to_error).map(|r| {
610            drop(tx2);
611            r?
612        }))
613    }
614
615    fn add_task<F>(&self, task: F)
616    where
617        F: Future<Output = Result<(), Error>> + 'static,
618    {
619        if let Some(ref mut tasks) = *self.tasks.borrow_mut() {
620            tasks.add(task);
621        }
622    }
623
624    pub fn bootstrap(state: &Rc<Self>) -> Box<dyn ClientHook> {
625        let question_id = state.questions.borrow_mut().push(Question::new());
626
627        let (fulfiller, promise) = oneshot::channel();
628        let promise = promise.map_err(crate::canceled_to_error);
629        let promise = promise.and_then(|response_promise| response_promise);
630        let question_ref = Rc::new(RefCell::new(QuestionRef::new(
631            state.clone(),
632            question_id,
633            fulfiller,
634        )));
635        let promise = promise.attach(question_ref.clone());
636        match state.questions.borrow_mut().slots[question_id as usize] {
637            Some(ref mut q) => {
638                q.self_ref = Some(Rc::downgrade(&question_ref));
639            }
640            None => unreachable!(),
641        }
642        match *state.connection.borrow_mut() {
643            Ok(ref mut c) => {
644                let mut message = c.new_outgoing_message(5);
645                {
646                    let mut builder = message
647                        .get_body()
648                        .unwrap()
649                        .init_as::<message::Builder>()
650                        .init_bootstrap();
651                    builder.set_question_id(question_id);
652                }
653                let _ = message.send();
654            }
655            Err(_) => panic!(),
656        }
657
658        let pipeline = Pipeline::new(state, question_ref, Some(Promise::from_future(promise)));
659        pipeline.get_pipelined_cap_move(Vec::new())
660    }
661
662    fn message_loop(weak_state: Weak<Self>) -> Promise<(), capnp::Error> {
663        let Some(state) = weak_state.upgrade() else {
664            return Promise::err(Error::disconnected(
665                "message loop cannot continue without a connection".into(),
666            ));
667        };
668
669        let promise = match *state.connection.borrow_mut() {
670            Err(_) => return Promise::ok(()),
671            Ok(ref mut connection) => connection.receive_incoming_message(),
672        };
673
674        Promise::from_future(async move {
675            match promise.await? {
676                Some(m) => {
677                    Self::handle_message(&weak_state, m)?;
678                    weak_state
679                        .upgrade()
680                        .expect("message loop outlived connection state?")
681                        .add_task(Self::message_loop(weak_state));
682                }
683                None => {
684                    weak_state
685                        .upgrade()
686                        .expect("message loop outlived connection state?")
687                        .disconnect(Error::disconnected("Peer disconnected.".to_string()));
688                }
689            }
690            Ok(())
691        })
692    }
693
694    fn send_unimplemented(
695        connection_state: &Rc<Self>,
696        message: &dyn crate::IncomingMessage,
697    ) -> capnp::Result<()> {
698        let mut out_message = connection_state.new_outgoing_message(50)?; // XXX size hint
699        {
700            let mut root: message::Builder = out_message.get_body()?.get_as()?;
701            root.set_unimplemented(message.get_body()?.get_as()?)?;
702        }
703        let _ = out_message.send();
704        Ok(())
705    }
706
707    fn handle_unimplemented(
708        connection_state: &Rc<Self>,
709        message: message::Reader,
710    ) -> capnp::Result<()> {
711        match message.which()? {
712            message::Resolve(resolve) => {
713                let resolve = resolve?;
714                match resolve.which()? {
715                    resolve::Cap(c) => match c?.which()? {
716                        cap_descriptor::None(()) => (),
717                        cap_descriptor::SenderHosted(export_id) => {
718                            connection_state.release_export(export_id, 1)?;
719                        }
720                        cap_descriptor::SenderPromise(export_id) => {
721                            connection_state.release_export(export_id, 1)?;
722                        }
723                        cap_descriptor::ReceiverAnswer(_) | cap_descriptor::ReceiverHosted(_) => (),
724                        cap_descriptor::ThirdPartyHosted(_) => {
725                            return Err(Error::failed(
726                                "Peer claims we resolved a ThirdPartyHosted cap.".to_string(),
727                            ));
728                        }
729                    },
730                    resolve::Exception(_) => (),
731                }
732            }
733            _ => {
734                return Err(Error::failed(
735                    "Peer did not implement required RPC message type.".to_string(),
736                ));
737            }
738        }
739        Ok(())
740    }
741
742    fn handle_bootstrap(
743        connection_state: &Rc<Self>,
744        bootstrap: bootstrap::Reader,
745    ) -> capnp::Result<()> {
746        use ::capnp::traits::ImbueMut;
747
748        let answer_id = bootstrap.get_question_id();
749        if connection_state.connection.borrow().is_err() {
750            // Disconnected; ignore.
751            return Ok(());
752        }
753
754        let mut response = connection_state.new_outgoing_message(10)?;
755
756        let result_exports = {
757            let mut ret = response
758                .get_body()?
759                .init_as::<message::Builder>()
760                .init_return();
761            ret.set_answer_id(answer_id);
762
763            let cap = connection_state.bootstrap_cap.clone();
764            let mut cap_table = Vec::new();
765            let mut payload = ret.init_results();
766            {
767                let mut content = payload.reborrow().get_content();
768                content.imbue_mut(&mut cap_table);
769                content.set_as_capability(cap);
770            }
771            assert_eq!(cap_table.len(), 1);
772
773            Self::write_descriptors(connection_state, &cap_table, payload)
774        };
775
776        let slots = &mut connection_state.answers.borrow_mut().slots;
777        let hash_map::Entry::Vacant(slot) = slots.entry(answer_id) else {
778            connection_state.release_exports(&result_exports)?;
779            return Err(Error::failed("questionId is already in use".to_string()));
780        };
781        let mut answer = Answer::new();
782        answer.return_has_been_sent = true;
783        answer.result_exports = result_exports;
784        answer.pipeline = Some(Box::new(SingleCapPipeline::new(
785            connection_state.bootstrap_cap.clone(),
786        )));
787        slot.insert(answer);
788
789        let _ = response.send();
790        Ok(())
791    }
792
793    fn handle_finish(connection_state: &Rc<Self>, finish: finish::Reader) -> capnp::Result<()> {
794        let mut exports_to_release = Vec::new();
795        let answer_id = finish.get_question_id();
796
797        let answers_slots = &mut connection_state.answers.borrow_mut().slots;
798        match answers_slots.entry(answer_id) {
799            hash_map::Entry::Vacant(_) => {
800                // The `Finish` message targets a question ID that isn't present in our answer table.
801                // Probably, we sent a `Return` with `noFinishNeeded = true`, but the other side didn't
802                // recognize this hint and sent a `Finish` anyway, or the `Finish` was already in-flight at
803                // the time we sent the `Return`. We can silently ignore this.
804            }
805            hash_map::Entry::Occupied(mut entry) => {
806                let answer = entry.get_mut();
807                answer.received_finish.set(true);
808
809                if finish.get_release_result_caps() {
810                    exports_to_release = ::std::mem::take(&mut answer.result_exports);
811                }
812
813                // If the pipeline has not been cloned, the following two lines cancel the call.
814                answer.pipeline.take();
815                answer.call_completion_promise.take();
816
817                if answer.return_has_been_sent {
818                    entry.remove();
819                }
820            }
821        }
822
823        connection_state.release_exports(&exports_to_release)?;
824        Ok(())
825    }
826
827    fn handle_resolve(connection_state: &Rc<Self>, resolve: resolve::Reader) -> capnp::Result<()> {
828        let replacement_or_error = match resolve.which()? {
829            resolve::Cap(c) => match Self::receive_cap(connection_state, c?)? {
830                Some(cap) => Ok(cap),
831                None => {
832                    return Err(Error::failed(
833                        "'Resolve' contained 'CapDescriptor.none'.".to_string(),
834                    ));
835                }
836            },
837            resolve::Exception(e) => {
838                // We can't set `replacement` to a new broken cap here because this will
839                // confuse PromiseClient::Resolve() into thinking that the remote
840                // promise resolved to a local capability and therefore a Disembargo is
841                // needed. We must actually reject the promise.
842                Err(remote_exception_to_error(e?))
843            }
844        };
845
846        // If the import is in the table, fulfill it.
847        let slots = &mut connection_state.imports.borrow_mut().slots;
848        if let Some(import) = slots.get_mut(&resolve.get_promise_id()) {
849            match import.promise_client_to_resolve.take() {
850                Some(weak_promise_client) => {
851                    if let Some(promise_client) = weak_promise_client.upgrade() {
852                        promise_client.borrow_mut().resolve(replacement_or_error);
853                    }
854                }
855                None => {
856                    return Err(Error::failed(
857                        "Got 'Resolve' for a non-promise import.".to_string(),
858                    ));
859                }
860            }
861        }
862        Ok(())
863    }
864
865    fn handle_disembargo(
866        connection_state: &Rc<Self>,
867        disembargo: disembargo::Reader,
868    ) -> capnp::Result<()> {
869        let context = disembargo.get_context();
870        match context.which()? {
871            disembargo::context::SenderLoopback(embargo_id) => {
872                let mut target = connection_state.get_message_target(disembargo.get_target()?)?;
873                while let Some(resolved) = target.get_resolved() {
874                    target = resolved;
875                }
876
877                if target.get_brand() != connection_state.get_brand() {
878                    return Err(Error::failed(
879                        "'Disembargo' of type 'senderLoopback' sent to an object that does not point \
880                         back to the sender.".to_string()));
881                }
882
883                let connection_state_ref = connection_state.clone();
884                let connection_state_ref1 = connection_state.clone();
885                let task = async move {
886                    if let Ok(ref mut c) = *connection_state_ref.connection.borrow_mut() {
887                        let mut message = c.new_outgoing_message(100); // TODO estimate size
888                        {
889                            let root: message::Builder = message.get_body()?.init_as();
890                            let mut disembargo = root.init_disembargo();
891                            disembargo
892                                .reborrow()
893                                .init_context()
894                                .set_receiver_loopback(embargo_id);
895
896                            let redirect =
897                                match Client::from_ptr(target.get_ptr(), &connection_state_ref1) {
898                                    Some(c) => c.write_target(disembargo.init_target()),
899                                    None => unreachable!(),
900                                };
901                            if redirect.is_some() {
902                                return Err(Error::failed(
903                                    "'Disembargo' of type 'senderLoopback' sent to an object that \
904                                     does not appear to have been the subject of a previous \
905                                     'Resolve' message."
906                                        .to_string(),
907                                ));
908                            }
909                        }
910                        let _ = message.send();
911                    }
912                    Ok(())
913                };
914                connection_state.add_task(task);
915            }
916            disembargo::context::ReceiverLoopback(embargo_id) => {
917                if let Some(embargo) = connection_state.embargoes.borrow_mut().find(embargo_id) {
918                    let fulfiller = embargo.fulfiller.take().unwrap();
919                    let _ = fulfiller.send(Ok(()));
920                } else {
921                    return Err(Error::failed(
922                        "Invalid embargo ID in `Disembargo.context.receiverLoopback".to_string(),
923                    ));
924                }
925                connection_state.embargoes.borrow_mut().erase(embargo_id);
926            }
927            disembargo::context::Accept(_) | disembargo::context::Provide(_) => {
928                return Err(Error::unimplemented(
929                    "Disembargo::Context::Provide/Accept not implemented".to_string(),
930                ));
931            }
932        }
933        Ok(())
934    }
935
936    fn handle_message(
937        weak_state: &Weak<Self>,
938        message: Box<dyn crate::IncomingMessage>,
939    ) -> ::capnp::Result<()> {
940        let Some(connection_state) = weak_state.upgrade() else {
941            return Err(Error::disconnected(
942                "handle_message() cannot continue without a connection".into(),
943            ));
944        };
945
946        let reader = message.get_body()?.get_as::<message::Reader>()?;
947        match reader.which() {
948            Ok(message::Unimplemented(message)) => {
949                Self::handle_unimplemented(&connection_state, message?)?
950            }
951            Ok(message::Abort(abort)) => return Err(remote_exception_to_error(abort?)),
952            Ok(message::Bootstrap(bootstrap)) => {
953                Self::handle_bootstrap(&connection_state, bootstrap?)?
954            }
955            Ok(message::Call(call)) => {
956                let call = call?;
957                let capability = connection_state.get_message_target(call.get_target()?)?;
958                let (interface_id, method_id, question_id, cap_table_array, redirect_results) = {
959                    let redirect_results = match call.get_send_results_to().which()? {
960                        call::send_results_to::Caller(()) => false,
961                        call::send_results_to::Yourself(()) => true,
962                        call::send_results_to::ThirdParty(_) => {
963                            return Err(Error::failed(
964                                "Unsupported `Call.sendResultsTo`.".to_string(),
965                            ))
966                        }
967                    };
968                    let payload = call.get_params()?;
969
970                    (
971                        call.get_interface_id(),
972                        call.get_method_id(),
973                        call.get_question_id(),
974                        Self::receive_caps(&connection_state, payload.get_cap_table()?)?,
975                        redirect_results,
976                    )
977                };
978
979                if connection_state
980                    .answers
981                    .borrow()
982                    .slots
983                    .contains_key(&question_id)
984                {
985                    return Err(Error::failed(format!(
986                        "Received a new call on in-use question id {question_id}"
987                    )));
988                }
989
990                let params = Params::new(message, cap_table_array);
991
992                let answer = Answer::new();
993
994                let (results_inner_fulfiller, results_inner_promise) = oneshot::channel();
995                let results_inner_promise = results_inner_promise.map_err(crate::canceled_to_error);
996
997                let (pipeline_sender, mut pipeline) = queued::Pipeline::new();
998                let results = Results::new(
999                    &connection_state,
1000                    question_id,
1001                    redirect_results,
1002                    results_inner_fulfiller,
1003                    answer.received_finish.clone(),
1004                    Some(pipeline_sender.weak_clone()),
1005                );
1006
1007                let (redirected_results_done_promise, redirected_results_done_fulfiller) =
1008                    if redirect_results {
1009                        let (f, p) = oneshot::channel::<Result<Response<VatId>, Error>>();
1010                        let p = p.map_err(crate::canceled_to_error).and_then(future::ready);
1011                        (Some(Promise::from_future(p)), Some(f))
1012                    } else {
1013                        (None, None)
1014                    };
1015
1016                {
1017                    let slots = &mut connection_state.answers.borrow_mut().slots;
1018                    let hash_map::Entry::Vacant(slot) = slots.entry(question_id) else {
1019                        return Err(Error::failed("questionId is already in use".to_string()));
1020                    };
1021                    slot.insert(answer);
1022                }
1023
1024                let call_promise =
1025                    capability.call(interface_id, method_id, Box::new(params), Box::new(results));
1026
1027                let promise = call_promise
1028                    .then(move |call_result| {
1029                        results_inner_promise.then(move |result| {
1030                            future::ready(ResultsDone::from_results_inner(
1031                                result,
1032                                call_result,
1033                                pipeline_sender,
1034                            ))
1035                        })
1036                    })
1037                    .then(move |v| {
1038                        if let Some(f) = redirected_results_done_fulfiller {
1039                            match v {
1040                                Ok(r) => drop(f.send(Ok(Response::redirected(r.clone())))),
1041                                Err(e) => drop(f.send(Err(e))),
1042                            }
1043                        }
1044                        Promise::ok(())
1045                    });
1046
1047                let fork = promise.shared();
1048                pipeline.drive(fork.clone());
1049
1050                {
1051                    let slots = &mut connection_state.answers.borrow_mut().slots;
1052                    let Some(answer) = slots.get_mut(&question_id) else {
1053                        unreachable!()
1054                    };
1055                    answer.pipeline = Some(Box::new(pipeline));
1056                    if redirect_results {
1057                        answer.redirected_results = redirected_results_done_promise;
1058                        // More to do here?
1059                    } else {
1060                        answer.call_completion_promise =
1061                            Some(connection_state.eagerly_evaluate(fork));
1062                    }
1063                }
1064            }
1065            Ok(message::Return(oret)) => {
1066                let ret = oret?;
1067                let question_id = ret.get_answer_id();
1068
1069                let mut questions = connection_state.questions.borrow_mut();
1070                match questions.slots[question_id as usize] {
1071                    Some(ref mut question) => {
1072                        question.is_awaiting_return = false;
1073                        if ret.get_no_finish_needed() {
1074                            question.skip_finish = true;
1075                        }
1076                        match question.self_ref {
1077                            Some(ref question_ref) => match ret.which()? {
1078                                return_::Results(results) => {
1079                                    let cap_table = Self::receive_caps(
1080                                        &connection_state,
1081                                        results?.get_cap_table()?,
1082                                    )?;
1083
1084                                    let question_ref =
1085                                        question_ref.upgrade().expect("dangling question ref?");
1086                                    let response = Response::new(
1087                                        connection_state.clone(),
1088                                        question_ref.clone(),
1089                                        message,
1090                                        cap_table,
1091                                    );
1092                                    question_ref.borrow_mut().fulfill(Promise::ok(response));
1093                                }
1094                                return_::Exception(e) => {
1095                                    let tmp =
1096                                        question_ref.upgrade().expect("dangling question ref?");
1097                                    tmp.borrow_mut().reject(remote_exception_to_error(e?));
1098                                }
1099                                return_::Canceled(_) => {
1100                                    Self::send_unimplemented(&connection_state, message.as_ref())?;
1101                                }
1102                                return_::ResultsSentElsewhere(_) => {
1103                                    Self::send_unimplemented(&connection_state, message.as_ref())?;
1104                                }
1105                                return_::TakeFromOtherQuestion(id) => {
1106                                    if let Some(answer) =
1107                                        connection_state.answers.borrow_mut().slots.get_mut(&id)
1108                                    {
1109                                        if let Some(res) = answer.redirected_results.take() {
1110                                            let tmp = question_ref
1111                                                .upgrade()
1112                                                .expect("dangling question ref?");
1113                                            tmp.borrow_mut().fulfill(res);
1114                                        } else {
1115                                            return Err(Error::failed("return.takeFromOtherQuestion referenced a call that \
1116                                                     did not use sendResultsTo.yourself.".to_string()));
1117                                        }
1118                                    } else {
1119                                        return Err(Error::failed(
1120                                            "return.takeFromOtherQuestion had invalid answer ID."
1121                                                .to_string(),
1122                                        ));
1123                                    }
1124                                }
1125                                return_::AcceptFromThirdParty(_) => {
1126                                    drop(questions);
1127                                    Self::send_unimplemented(&connection_state, message.as_ref())?;
1128                                }
1129                            },
1130                            None => {
1131                                if let return_::TakeFromOtherQuestion(_) = ret.which()? {
1132                                    return Self::send_unimplemented(
1133                                        &connection_state,
1134                                        message.as_ref(),
1135                                    );
1136                                }
1137                                // Looks like this question was canceled earlier, so `Finish`
1138                                // was already sent, with `releaseResultCaps` set true so that
1139                                // we don't have to release them here. We can go ahead and
1140                                // delete it from the table.
1141                                questions.erase(question_id);
1142                            }
1143                        }
1144                    }
1145                    None => {
1146                        return Err(Error::failed(format!(
1147                            "Invalid question ID in Return message: {question_id}"
1148                        )));
1149                    }
1150                }
1151            }
1152            Ok(message::Finish(finish)) => Self::handle_finish(&connection_state, finish?)?,
1153            Ok(message::Resolve(resolve)) => Self::handle_resolve(&connection_state, resolve?)?,
1154            Ok(message::Release(release)) => {
1155                let release = release?;
1156                connection_state.release_export(release.get_id(), release.get_reference_count())?;
1157            }
1158            Ok(message::Disembargo(disembargo)) => {
1159                Self::handle_disembargo(&connection_state, disembargo?)?
1160            }
1161            Ok(
1162                message::Provide(_)
1163                | message::Accept(_)
1164                | message::Join(_)
1165                | message::ObsoleteSave(_)
1166                | message::ObsoleteDelete(_),
1167            )
1168            | Err(::capnp::NotInSchema(_)) => {
1169                Self::send_unimplemented(&connection_state, message.as_ref())?;
1170            }
1171        }
1172        Ok(())
1173    }
1174
1175    fn answer_has_sent_return(&self, id: AnswerId, result_exports: Vec<ExportId>) {
1176        let answers_slots = &mut self.answers.borrow_mut().slots;
1177        let hash_map::Entry::Occupied(mut entry) = answers_slots.entry(id) else {
1178            unreachable!()
1179        };
1180        let a = entry.get_mut();
1181        a.return_has_been_sent = true;
1182        if a.received_finish.get() {
1183            entry.remove();
1184        } else {
1185            a.result_exports = result_exports;
1186        }
1187    }
1188
1189    fn release_export(&self, id: ExportId, refcount: u32) -> ::capnp::Result<()> {
1190        let mut exports = self.exports.borrow_mut();
1191        let Some(e) = exports.find(id) else {
1192            return Err(Error::failed(
1193                "Tried to release invalid export ID.".to_string(),
1194            ));
1195        };
1196        if refcount > e.refcount {
1197            return Err(Error::failed(
1198                "Tried to drop export's refcount below zero.".to_string(),
1199            ));
1200        }
1201        e.refcount -= refcount;
1202        if e.refcount == 0 {
1203            let client_ptr = e.client_hook.get_ptr();
1204            if e.canonical {
1205                self.exports_by_cap.borrow_mut().remove(&client_ptr);
1206            }
1207            exports.erase(id);
1208        }
1209        Ok(())
1210    }
1211
1212    fn release_exports(&self, exports: &[ExportId]) -> ::capnp::Result<()> {
1213        for &export_id in exports {
1214            self.release_export(export_id, 1)?;
1215        }
1216        Ok(())
1217    }
1218
1219    fn get_brand(&self) -> usize {
1220        self as *const _ as usize
1221    }
1222
1223    fn get_message_target(
1224        &self,
1225        target: message_target::Reader,
1226    ) -> ::capnp::Result<Box<dyn ClientHook>> {
1227        match target.which()? {
1228            message_target::ImportedCap(export_id) => {
1229                match self.exports.borrow().slots.get(export_id as usize) {
1230                    Some(Some(exp)) => Ok(exp.client_hook.clone()),
1231                    _ => Err(Error::failed(
1232                        "Message target is not a current export ID.".to_string(),
1233                    )),
1234                }
1235            }
1236            message_target::PromisedAnswer(promised_answer) => {
1237                let promised_answer = promised_answer?;
1238                let question_id = promised_answer.get_question_id();
1239
1240                let pipeline = match self.answers.borrow().slots.get(&question_id) {
1241                    None => Box::new(broken::Pipeline::new(Error::failed(
1242                        "Pipeline call on a request that returned no capabilities or was already closed.".to_string(),
1243                    ))) as Box<dyn PipelineHook>,
1244                    Some(base) => {
1245                        match base.pipeline {
1246                            Some(ref pipeline) => pipeline.add_ref(),
1247                            None => Box::new(broken::Pipeline::new(Error::failed(
1248                                "Pipeline call on a request that returned not capabilities or was \
1249                                 already closed."
1250                                    .to_string(),
1251                            ))) as Box<dyn PipelineHook>,
1252                        }
1253                    }
1254                };
1255                let ops = to_pipeline_ops(promised_answer.get_transform()?)?;
1256                Ok(pipeline.get_pipelined_cap(&ops))
1257            }
1258        }
1259    }
1260
1261    /// If calls to the given capability should pass over this connection, fill in `target`
1262    /// appropriately for such a call and return None. Otherwise, return a `ClientHook` to which
1263    /// the call should be forwarded; the caller should then delegate the call to that `ClientHook`.
1264    ///
1265    /// The main case where this ends up returning Some(_) is if `cap` is a promise that has
1266    /// recently resolved. The application might have started building a request before the promise
1267    /// resolved, and so the request may have been built on the assumption that it would be sent over
1268    /// this network connection, but then the promise resolved to point somewhere else before the
1269    /// request was sent. Now the request has to be redirected to the new target instead.
1270    fn write_target(
1271        &self,
1272        cap: &dyn ClientHook,
1273        target: message_target::Builder,
1274    ) -> Option<Box<dyn ClientHook>> {
1275        if cap.get_brand() == self.get_brand() {
1276            match Client::from_ptr(cap.get_ptr(), self) {
1277                Some(c) => c.write_target(target),
1278                None => unreachable!(),
1279            }
1280        } else {
1281            Some(cap.add_ref())
1282        }
1283    }
1284
1285    /// If the given client just wraps some other client -- even if it is only *temporarily*
1286    /// wrapping that other client -- returns a reference to the other client, transitively.
1287    /// Otherwise, returns a new reference to *this.
1288    fn get_innermost_client(&self, mut client: Box<dyn ClientHook>) -> Box<dyn ClientHook> {
1289        while let Some(inner) = client.get_resolved() {
1290            client = inner;
1291        }
1292        if client.get_brand() == self.get_brand() {
1293            match self.client_downcast_map.borrow().get(&client.get_ptr()) {
1294                Some(c) => Box::new(c.upgrade().expect("dangling client?")),
1295                None => unreachable!(),
1296            }
1297        } else {
1298            client
1299        }
1300    }
1301
1302    /// Implements exporting of a promise.  The promise has been exported under the given ID, and is
1303    /// to eventually resolve to the ClientHook produced by `promise`.  This method waits for that
1304    /// resolve to happen and then sends the appropriate `Resolve` message to the peer.
1305    #[allow(clippy::await_holding_refcell_ref)] // https://github.com/rust-lang/rust-clippy/issues/6353
1306    fn resolve_exported_promise(
1307        state: &Rc<Self>,
1308        export_id: ExportId,
1309        promise: Promise<Box<dyn ClientHook>, Error>,
1310    ) -> Promise<(), Error> {
1311        let weak_connection_state = Rc::downgrade(state);
1312        state.eagerly_evaluate(Promise::from_future(async move {
1313            let resolution_result = promise.await;
1314            let connection_state = weak_connection_state
1315                .upgrade()
1316                .expect("dangling connection state?");
1317
1318            match resolution_result {
1319                Ok(resolution) => {
1320                    let resolution = connection_state.get_innermost_client(resolution.clone());
1321
1322                    let brand = resolution.get_brand();
1323
1324                    // Update the export table to point at this object instead. We know that our
1325                    // entry in the export table is still live because when it is destroyed the
1326                    // asynchronous resolution task (i.e. this code) is canceled.
1327                    let mut exports = connection_state.exports.borrow_mut();
1328                    let Some(exp) = exports.find(export_id) else {
1329                        return Err(Error::failed("export table entry not found".to_string()));
1330                    };
1331
1332                    if exp.canonical {
1333                        connection_state
1334                            .exports_by_cap
1335                            .borrow_mut()
1336                            .remove(&exp.client_hook.get_ptr());
1337                    }
1338                    exp.client_hook = resolution.clone();
1339
1340                    // The export now points to `resolution`, but it is not necessarily the
1341                    // canonical export for `resolution`. The export itself still represents
1342                    // the promise that ended up resolving to `resolution`, but `resolution`
1343                    // itself also needs to be exported under a separate export ID to
1344                    // distinguish from the promise. (Unless it's also a promise, see the next
1345                    // bit...)
1346                    exp.canonical = false;
1347
1348                    if brand != connection_state.get_brand() {
1349                        // We're resolving to a local capability. If we're resolving to a promise,
1350                        // we might be able to reuse our export table entry and avoid sending a
1351                        // message.
1352                        if let Some(promise) = resolution.when_more_resolved() {
1353                            // We're replacing a promise with another local promise. In this case,
1354                            // we might actually be able to just reuse the existing export table
1355                            // entry to represent the new promise -- unless it already has an entry.
1356                            // Let's check.
1357
1358                            let mut exports_by_cap = connection_state.exports_by_cap.borrow_mut();
1359
1360                            let replacement_export_id =
1361                                match exports_by_cap.entry(exp.client_hook.get_ptr()) {
1362                                    hash_map::Entry::Occupied(occ) => *occ.get(),
1363                                    hash_map::Entry::Vacant(vac) => {
1364                                        // The replacement capability isn't previously exported,
1365                                        // so assign it to the existing table entry.
1366                                        vac.insert(export_id);
1367                                        export_id
1368                                    }
1369                                };
1370                            if replacement_export_id == export_id {
1371                                // The new promise was not already in the table, therefore the existing
1372                                // export table entry has now been repurposed to represent it. There is
1373                                // no need to send a resolve message at all. We do, however, have to
1374                                // start resolving the next promise.
1375                                exp.canonical = true;
1376                                drop(exports);
1377                                drop(exports_by_cap);
1378                                return Self::resolve_exported_promise(
1379                                    &connection_state,
1380                                    export_id,
1381                                    promise,
1382                                )
1383                                .await;
1384                            }
1385                        }
1386                    }
1387                    // Prevent a double borrow in write_descriptor() below.
1388                    drop(exports);
1389
1390                    // OK, we have to send a `Resolve` message.
1391                    let mut message = connection_state.new_outgoing_message(15)?;
1392                    {
1393                        let root: message::Builder = message.get_body()?.get_as()?;
1394                        let mut resolve = root.init_resolve();
1395                        resolve.set_promise_id(export_id);
1396                        let _export = Self::write_descriptor(
1397                            &connection_state,
1398                            resolution,
1399                            resolve.init_cap(),
1400                        )?;
1401                    }
1402                    let _ = message.send();
1403                    Ok(())
1404                }
1405                Err(e) => {
1406                    // send error resolution
1407                    let mut message = connection_state.new_outgoing_message(15)?;
1408                    {
1409                        let root: message::Builder = message.get_body()?.get_as()?;
1410                        let mut resolve = root.init_resolve();
1411                        resolve.set_promise_id(export_id);
1412                        from_error(&e, resolve.init_exception());
1413                    }
1414                    let _ = message.send();
1415                    Ok(())
1416                }
1417            }
1418        }))
1419    }
1420
1421    fn write_descriptor(
1422        state: &Rc<Self>,
1423        mut inner: Box<dyn ClientHook>,
1424        mut descriptor: cap_descriptor::Builder,
1425    ) -> ::capnp::Result<Option<ExportId>> {
1426        // Find the innermost wrapped capability.
1427        while let Some(resolved) = inner.get_resolved() {
1428            inner = resolved;
1429        }
1430        if inner.get_brand() == state.get_brand() {
1431            let Some(c) = Client::from_ptr(inner.get_ptr(), state) else {
1432                unreachable!()
1433            };
1434            Ok(c.write_descriptor(descriptor))
1435        } else {
1436            let ptr = inner.get_ptr();
1437            let contains_key = state.exports_by_cap.borrow().contains_key(&ptr);
1438            if contains_key {
1439                // We've already seen and exported this capability before.  Just up the refcount.
1440                let export_id = state.exports_by_cap.borrow()[&ptr];
1441                descriptor.set_sender_hosted(export_id);
1442                // Should never fail because exports_by_cap should match exports.
1443                state.exports.borrow_mut().find(export_id).unwrap().refcount += 1;
1444                Ok(Some(export_id))
1445            } else {
1446                // This is the first time we've seen this capability.
1447
1448                let mut exp = Export::new(inner.clone());
1449                exp.canonical = true;
1450                let export_id = state.exports.borrow_mut().push(exp);
1451                state.exports_by_cap.borrow_mut().insert(ptr, export_id);
1452                match inner.when_more_resolved() {
1453                    Some(wrapped) => {
1454                        // This is a promise.  Arrange for the `Resolve` message to be sent later.
1455                        if let Some(exp) = state.exports.borrow_mut().find(export_id) {
1456                            exp.resolve_op =
1457                                Self::resolve_exported_promise(state, export_id, wrapped);
1458                        }
1459                        descriptor.set_sender_promise(export_id);
1460                    }
1461                    None => {
1462                        descriptor.set_sender_hosted(export_id);
1463                    }
1464                }
1465                Ok(Some(export_id))
1466            }
1467        }
1468    }
1469
1470    fn write_descriptors(
1471        state: &Rc<Self>,
1472        cap_table: &[Option<Box<dyn ClientHook>>],
1473        payload: payload::Builder,
1474    ) -> Vec<ExportId> {
1475        let mut cap_table_builder = payload.init_cap_table(cap_table.len() as u32);
1476        let mut exports = Vec::new();
1477        for (idx, value) in cap_table.iter().enumerate() {
1478            match value {
1479                Some(cap) => {
1480                    if let Some(export_id) = Self::write_descriptor(
1481                        state,
1482                        cap.clone(),
1483                        cap_table_builder.reborrow().get(idx as u32),
1484                    )
1485                    .unwrap()
1486                    {
1487                        exports.push(export_id);
1488                    }
1489                }
1490                None => {
1491                    cap_table_builder.reborrow().get(idx as u32).set_none(());
1492                }
1493            }
1494        }
1495        exports
1496    }
1497
1498    fn import(state: &Rc<Self>, import_id: ImportId, is_promise: bool) -> Box<dyn ClientHook> {
1499        let import_client = {
1500            match state.imports.borrow_mut().slots.entry(import_id) {
1501                hash_map::Entry::Occupied(occ) => occ
1502                    .get()
1503                    .import_client
1504                    .upgrade()
1505                    .expect("dangling ref to import client?"),
1506                hash_map::Entry::Vacant(v) => {
1507                    let import_client = ImportClient::new(state, import_id);
1508                    v.insert(Import::new(&import_client));
1509                    import_client
1510                }
1511            }
1512        };
1513
1514        // We just received a copy of this import ID, so the remote refcount has gone up.
1515        import_client.borrow_mut().add_remote_ref();
1516
1517        let mut tmp = state.imports.borrow_mut();
1518        let Some(import) = tmp.slots.get_mut(&import_id) else {
1519            unreachable!()
1520        };
1521
1522        if is_promise {
1523            // We need to construct a PromiseClient around this import, if we haven't already.
1524            match &import.app_client {
1525                Some(c) => {
1526                    // Use the existing one.
1527                    Box::new(c.upgrade().expect("dangling client ref?"))
1528                }
1529                None => {
1530                    // Create a promise for this import's resolution.
1531
1532                    let client: Box<Client<VatId>> = Box::new(import_client.into());
1533                    let client: Box<dyn ClientHook> = client;
1534
1535                    // Here the C++ implementation does something like:
1536                    // ```
1537                    //   // Make sure the import is not destroyed while this promise exists.
1538                    //   let promise = promise.attach(client.add_ref());
1539                    // ```
1540                    // However, as far as I can tell that is unnecessary, because the
1541                    // PromiseClient holds `client` until it resolves, after which point
1542                    // there is no reason to keep the import alive.
1543
1544                    let client = PromiseClient::new(state, client, Some(import_id));
1545
1546                    import.promise_client_to_resolve = Some(Rc::downgrade(&client));
1547                    let client: Box<Client<VatId>> = Box::new(client.into());
1548                    import.app_client = Some(client.downgrade());
1549                    client
1550                }
1551            }
1552        } else {
1553            let client: Box<Client<VatId>> = Box::new(import_client.into());
1554            import.app_client = Some(client.downgrade());
1555            client
1556        }
1557    }
1558
1559    fn receive_cap(
1560        state: &Rc<Self>,
1561        descriptor: cap_descriptor::Reader,
1562    ) -> ::capnp::Result<Option<Box<dyn ClientHook>>> {
1563        match descriptor.which()? {
1564            cap_descriptor::None(()) => Ok(None),
1565            cap_descriptor::SenderHosted(sender_hosted) => {
1566                Ok(Some(Self::import(state, sender_hosted, false)))
1567            }
1568            cap_descriptor::SenderPromise(sender_promise) => {
1569                Ok(Some(Self::import(state, sender_promise, true)))
1570            }
1571            cap_descriptor::ReceiverHosted(receiver_hosted) => {
1572                if let Some(exp) = state.exports.borrow_mut().find(receiver_hosted) {
1573                    Ok(Some(exp.client_hook.add_ref()))
1574                } else {
1575                    Ok(Some(broken::new_cap(Error::failed(
1576                        "invalid 'receiverHosted' export ID".to_string(),
1577                    ))))
1578                }
1579            }
1580            cap_descriptor::ReceiverAnswer(receiver_answer) => {
1581                let promised_answer = receiver_answer?;
1582                let question_id = promised_answer.get_question_id();
1583                if let Some(answer) = state.answers.borrow().slots.get(&question_id) {
1584                    if let Some(ref pipeline) = answer.pipeline {
1585                        let ops = to_pipeline_ops(promised_answer.get_transform()?)?;
1586                        return Ok(Some(pipeline.get_pipelined_cap(&ops)));
1587                    }
1588                }
1589                Ok(Some(broken::new_cap(Error::failed(
1590                    "invalid 'receiver answer'".to_string(),
1591                ))))
1592            }
1593            cap_descriptor::ThirdPartyHosted(_third_party_hosted) => Err(Error::unimplemented(
1594                "ThirdPartyHosted caps are not supported.".to_string(),
1595            )),
1596        }
1597    }
1598
1599    fn receive_caps(
1600        state: &Rc<Self>,
1601        cap_table: ::capnp::struct_list::Reader<cap_descriptor::Owned>,
1602    ) -> ::capnp::Result<Vec<Option<Box<dyn ClientHook>>>> {
1603        let mut result = Vec::new();
1604        for idx in 0..cap_table.len() {
1605            result.push(Self::receive_cap(state, cap_table.get(idx))?);
1606        }
1607        Ok(result)
1608    }
1609}
1610
1611enum DisconnectorState {
1612    New,
1613    Disconnecting,
1614    Disconnected,
1615}
1616
1617/// A `Future` that can be run to disconnect an `RpcSystem`'s ConnectionState and wait for it to be closed.
1618pub struct Disconnector<VatId>
1619where
1620    VatId: 'static,
1621{
1622    connection_state: Rc<RefCell<Option<Rc<ConnectionState<VatId>>>>>,
1623    state: DisconnectorState,
1624}
1625
1626impl<VatId> Disconnector<VatId> {
1627    pub fn new(connection_state: Rc<RefCell<Option<Rc<ConnectionState<VatId>>>>>) -> Self {
1628        Self {
1629            connection_state,
1630            state: DisconnectorState::New,
1631        }
1632    }
1633    fn disconnect(&self) {
1634        if let Some(ref state) = *(self.connection_state.borrow()) {
1635            state.disconnect(::capnp::Error::disconnected(
1636                "client requested disconnect".to_owned(),
1637            ));
1638        }
1639    }
1640}
1641
1642impl<VatId> Future for Disconnector<VatId>
1643where
1644    VatId: 'static,
1645{
1646    type Output = Result<(), capnp::Error>;
1647
1648    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
1649        self.state = match self.state {
1650            DisconnectorState::New => {
1651                self.disconnect();
1652                DisconnectorState::Disconnecting
1653            }
1654            DisconnectorState::Disconnecting => {
1655                if self.connection_state.borrow().is_some() {
1656                    DisconnectorState::Disconnecting
1657                } else {
1658                    DisconnectorState::Disconnected
1659                }
1660            }
1661            DisconnectorState::Disconnected => DisconnectorState::Disconnected,
1662        };
1663        match self.state {
1664            DisconnectorState::New => unreachable!(),
1665            DisconnectorState::Disconnecting => {
1666                cx.waker().wake_by_ref();
1667                Poll::Pending
1668            }
1669            DisconnectorState::Disconnected => Poll::Ready(Ok(())),
1670        }
1671    }
1672}
1673
1674struct ResponseState<VatId>
1675where
1676    VatId: 'static,
1677{
1678    _connection_state: Rc<ConnectionState<VatId>>,
1679    message: Box<dyn crate::IncomingMessage>,
1680    cap_table: Vec<Option<Box<dyn ClientHook>>>,
1681    _question_ref: Rc<RefCell<QuestionRef<VatId>>>,
1682}
1683
1684enum ResponseVariant<VatId>
1685where
1686    VatId: 'static,
1687{
1688    Rpc(ResponseState<VatId>),
1689    LocallyRedirected(Box<dyn ResultsDoneHook>),
1690}
1691
1692struct Response<VatId>
1693where
1694    VatId: 'static,
1695{
1696    variant: Rc<ResponseVariant<VatId>>,
1697}
1698
1699impl<VatId> Response<VatId> {
1700    fn new(
1701        connection_state: Rc<ConnectionState<VatId>>,
1702        question_ref: Rc<RefCell<QuestionRef<VatId>>>,
1703        message: Box<dyn crate::IncomingMessage>,
1704        cap_table_array: Vec<Option<Box<dyn ClientHook>>>,
1705    ) -> Self {
1706        Self {
1707            variant: Rc::new(ResponseVariant::Rpc(ResponseState {
1708                _connection_state: connection_state,
1709                message,
1710                cap_table: cap_table_array,
1711                _question_ref: question_ref,
1712            })),
1713        }
1714    }
1715    fn redirected(results_done: Box<dyn ResultsDoneHook>) -> Self {
1716        Self {
1717            variant: Rc::new(ResponseVariant::LocallyRedirected(results_done)),
1718        }
1719    }
1720}
1721
1722impl<VatId> Clone for Response<VatId> {
1723    fn clone(&self) -> Self {
1724        Self {
1725            variant: self.variant.clone(),
1726        }
1727    }
1728}
1729
1730impl<VatId> ResponseHook for Response<VatId> {
1731    fn get(&self) -> ::capnp::Result<any_pointer::Reader<'_>> {
1732        match *self.variant {
1733            ResponseVariant::Rpc(ref state) => {
1734                match state
1735                    .message
1736                    .get_body()?
1737                    .get_as::<message::Reader>()?
1738                    .which()?
1739                {
1740                    message::Return(Ok(ret)) => match ret.which()? {
1741                        return_::Results(Ok(mut payload)) => {
1742                            use ::capnp::traits::Imbue;
1743                            payload.imbue(&state.cap_table);
1744                            Ok(payload.get_content())
1745                        }
1746                        _ => unreachable!(),
1747                    },
1748                    _ => unreachable!(),
1749                }
1750            }
1751            ResponseVariant::LocallyRedirected(ref results_done) => results_done.get(),
1752        }
1753    }
1754}
1755
1756struct Request<VatId>
1757where
1758    VatId: 'static,
1759{
1760    connection_state: Rc<ConnectionState<VatId>>,
1761    target: Client<VatId>,
1762    message: Box<dyn crate::OutgoingMessage>,
1763    cap_table: Vec<Option<Box<dyn ClientHook>>>,
1764}
1765
1766fn get_call(message: &mut Box<dyn crate::OutgoingMessage>) -> ::capnp::Result<call::Builder<'_>> {
1767    let message_root: message::Builder = message.get_body()?.get_as()?;
1768    match message_root.which()? {
1769        message::Call(call) => call,
1770        _ => {
1771            unimplemented!()
1772        }
1773    }
1774}
1775
1776impl<VatId> Request<VatId>
1777where
1778    VatId: 'static,
1779{
1780    fn new(
1781        connection_state: Rc<ConnectionState<VatId>>,
1782        _size_hint: Option<::capnp::MessageSize>,
1783        target: Client<VatId>,
1784    ) -> ::capnp::Result<Self> {
1785        let message = connection_state.new_outgoing_message(1024)?;
1786        Ok(Self {
1787            connection_state,
1788            target,
1789            message,
1790            cap_table: Vec::new(),
1791        })
1792    }
1793
1794    fn init_call(&mut self) -> call::Builder<'_> {
1795        let message_root: message::Builder = self.message.get_body().unwrap().get_as().unwrap();
1796        message_root.init_call()
1797    }
1798
1799    fn send_internal(
1800        connection_state: &Rc<ConnectionState<VatId>>,
1801        mut message: Box<dyn crate::OutgoingMessage>,
1802        cap_table: &[Option<Box<dyn ClientHook>>],
1803        is_tail_call: bool,
1804    ) -> (
1805        Rc<RefCell<QuestionRef<VatId>>>,
1806        Promise<Response<VatId>, Error>,
1807    ) {
1808        // Build the cap table.
1809        let exports = ConnectionState::write_descriptors(
1810            connection_state,
1811            cap_table,
1812            get_call(&mut message).unwrap().get_params().unwrap(),
1813        );
1814
1815        // Init the question table.  Do this after writing descriptors to avoid interference.
1816        let mut question = Question::<VatId>::new();
1817        question.is_awaiting_return = true;
1818        question.param_exports = exports;
1819        question.is_tail_call = is_tail_call;
1820
1821        let question_id = connection_state.questions.borrow_mut().push(question);
1822        {
1823            let mut call_builder: call::Builder = get_call(&mut message).unwrap();
1824            // Finish and send.
1825            call_builder.reborrow().set_question_id(question_id);
1826            if is_tail_call {
1827                call_builder.get_send_results_to().set_yourself(());
1828            }
1829        }
1830        let _ = message.send();
1831        // Make the result promise.
1832        let (fulfiller, promise) = oneshot::channel::<Promise<Response<VatId>, Error>>();
1833        let promise = promise.map_err(crate::canceled_to_error).and_then(|x| x);
1834        let question_ref = Rc::new(RefCell::new(QuestionRef::new(
1835            connection_state.clone(),
1836            question_id,
1837            fulfiller,
1838        )));
1839
1840        match connection_state.questions.borrow_mut().slots[question_id as usize] {
1841            Some(ref mut q) => {
1842                q.self_ref = Some(Rc::downgrade(&question_ref));
1843            }
1844            None => unreachable!(),
1845        }
1846
1847        let promise = promise.attach(question_ref.clone());
1848        let promise2 = Promise::from_future(promise);
1849
1850        (question_ref, promise2)
1851    }
1852
1853    fn send_streaming_internal(
1854        connection_state: &Rc<ConnectionState<VatId>>,
1855        mut message: Box<dyn crate::OutgoingMessage>,
1856        cap_table: &[Option<Box<dyn ClientHook>>],
1857        flow: Rc<RefCell<Option<Box<dyn crate::FlowController>>>>,
1858    ) -> Promise<(), Error> {
1859        // Build the cap table.
1860        let exports = ConnectionState::write_descriptors(
1861            connection_state,
1862            cap_table,
1863            get_call(&mut message).unwrap().get_params().unwrap(),
1864        );
1865
1866        // Init the question table.  Do this after writing descriptors to avoid interference.
1867        let mut question = Question::<VatId>::new();
1868        question.is_awaiting_return = true;
1869        question.param_exports = exports;
1870        question.is_tail_call = false;
1871
1872        let question_id = connection_state.questions.borrow_mut().push(question);
1873        {
1874            let mut call_builder: call::Builder = get_call(&mut message).unwrap();
1875            call_builder.reborrow().set_question_id(question_id);
1876        }
1877
1878        // Make the result promise.
1879        let (fulfiller, promise) = oneshot::channel::<Promise<Response<VatId>, Error>>();
1880        let promise = promise.map_err(crate::canceled_to_error).and_then(|x| x);
1881        let question_ref = Rc::new(RefCell::new(QuestionRef::new(
1882            connection_state.clone(),
1883            question_id,
1884            fulfiller,
1885        )));
1886
1887        match connection_state.questions.borrow_mut().slots[question_id as usize] {
1888            Some(ref mut q) => {
1889                q.self_ref = Some(Rc::downgrade(&question_ref));
1890            }
1891            None => unreachable!(),
1892        }
1893        let promise = promise.attach(question_ref.clone());
1894
1895        let mut flow = flow.borrow_mut();
1896        if flow.is_none() {
1897            match connection_state.connection.borrow_mut().as_mut() {
1898                Err(_) => return Promise::err(Error::failed("no connection".into())),
1899                Ok(connection) => {
1900                    let (s, p) = connection.new_stream();
1901                    connection_state.add_task(p);
1902                    *flow = Some(s);
1903                }
1904            };
1905        }
1906        let Some(ref mut flow) = *flow else {
1907            unreachable!()
1908        };
1909        flow.send(
1910            message,
1911            Promise::from_future(async move {
1912                let _ = promise.await?;
1913                Ok(())
1914            }),
1915        )
1916    }
1917}
1918
1919impl<VatId> RequestHook for Request<VatId> {
1920    fn get(&mut self) -> any_pointer::Builder<'_> {
1921        use ::capnp::traits::ImbueMut;
1922        let mut builder = get_call(&mut self.message)
1923            .unwrap()
1924            .get_params()
1925            .unwrap()
1926            .get_content();
1927        builder.imbue_mut(&mut self.cap_table);
1928        builder
1929    }
1930    fn get_brand<'a>(&self) -> usize {
1931        self.connection_state.get_brand()
1932    }
1933    fn send(self: Box<Self>) -> ::capnp::capability::RemotePromise<any_pointer::Owned> {
1934        let tmp = *self;
1935        let Self {
1936            connection_state,
1937            target,
1938            mut message,
1939            cap_table,
1940        } = tmp;
1941        let write_target_result = {
1942            let call_builder: call::Builder = get_call(&mut message).unwrap();
1943            target.write_target(call_builder.get_target().unwrap())
1944        };
1945        if let Some(redirect) = write_target_result {
1946            // Whoops, this capability has been redirected while we were building the request!
1947            // We'll have to make a new request and do a copy.  Ick.
1948            let mut call_builder: call::Builder = get_call(&mut message).unwrap();
1949            let mut replacement = redirect.new_call(
1950                call_builder.reborrow().get_interface_id(),
1951                call_builder.reborrow().get_method_id(),
1952                None,
1953            );
1954
1955            replacement
1956                .set(
1957                    call_builder
1958                        .get_params()
1959                        .unwrap()
1960                        .get_content()
1961                        .into_reader(),
1962                )
1963                .unwrap();
1964            return replacement.send();
1965        }
1966        let (question_ref, promise) =
1967            Self::send_internal(&connection_state, message, &cap_table, false);
1968        let forked_promise1 = promise.shared();
1969        let forked_promise2 = forked_promise1.clone();
1970
1971        // The pipeline must get notified of resolution before the app does to maintain ordering.
1972        let pipeline = Pipeline::new(
1973            &connection_state,
1974            question_ref,
1975            Some(Promise::from_future(forked_promise1)),
1976        );
1977
1978        let resolved = pipeline.when_resolved();
1979
1980        let forked_promise2 = resolved.map(|_| Ok(())).and_then(|()| forked_promise2);
1981
1982        let app_promise = Promise::from_future(
1983            forked_promise2
1984                .map_ok(|response| ::capnp::capability::Response::new(Box::new(response))),
1985        );
1986
1987        ::capnp::capability::RemotePromise {
1988            promise: app_promise,
1989            pipeline: any_pointer::Pipeline::new(Box::new(pipeline)),
1990        }
1991    }
1992    fn send_streaming(self: Box<Self>) -> Promise<(), Error> {
1993        let tmp = *self;
1994        let Self {
1995            connection_state,
1996            target,
1997            mut message,
1998            cap_table,
1999        } = tmp;
2000        let write_target_result = {
2001            let call_builder: call::Builder = get_call(&mut message).unwrap();
2002            target.write_target(call_builder.get_target().unwrap())
2003        };
2004        if let Some(redirect) = write_target_result {
2005            // Whoops, this capability has been redirected while we were building the request!
2006            // We'll have to make a new request and do a copy.  Ick.
2007            let mut call_builder: call::Builder = get_call(&mut message).unwrap();
2008            let mut replacement = redirect.new_call(
2009                call_builder.reborrow().get_interface_id(),
2010                call_builder.reborrow().get_method_id(),
2011                None,
2012            );
2013
2014            replacement
2015                .set(
2016                    call_builder
2017                        .get_params()
2018                        .unwrap()
2019                        .get_content()
2020                        .into_reader(),
2021                )
2022                .unwrap();
2023            return replacement.hook.send_streaming();
2024        }
2025        Self::send_streaming_internal(
2026            &connection_state,
2027            message,
2028            &cap_table,
2029            target.flow_controller,
2030        )
2031    }
2032    fn tail_send(self: Box<Self>) -> Option<(u32, Promise<(), Error>, Box<dyn PipelineHook>)> {
2033        let tmp = *self;
2034        let Self {
2035            connection_state,
2036            target,
2037            mut message,
2038            cap_table,
2039        } = tmp;
2040
2041        if connection_state.connection.borrow().is_err() {
2042            // Disconnected; fall back to a regular send() which will fail appropriately.
2043            return None;
2044        }
2045
2046        let write_target_result = {
2047            let call_builder: crate::rpc_capnp::call::Builder = get_call(&mut message).unwrap();
2048            target.write_target(call_builder.get_target().unwrap())
2049        };
2050
2051        let (question_ref, promise) = match write_target_result {
2052            Some(_redirect) => {
2053                return None;
2054            }
2055            None => Self::send_internal(&connection_state, message, &cap_table, true),
2056        };
2057
2058        let promise = promise.map_ok(|_response| {
2059            // Response should be null if `Return` handling code is correct.
2060
2061            unimplemented!()
2062        });
2063
2064        let question_id = question_ref.borrow().id;
2065        let pipeline = Pipeline::never_done(connection_state, question_ref);
2066
2067        Some((
2068            question_id,
2069            Promise::from_future(promise),
2070            Box::new(pipeline),
2071        ))
2072    }
2073}
2074
2075enum PipelineVariant<VatId>
2076where
2077    VatId: 'static,
2078{
2079    Waiting(Rc<RefCell<QuestionRef<VatId>>>),
2080    Resolved(Response<VatId>),
2081    Broken(Error),
2082}
2083
2084struct PipelineState<VatId>
2085where
2086    VatId: 'static,
2087{
2088    variant: PipelineVariant<VatId>,
2089    redirect_later: Option<RefCell<futures::future::Shared<Promise<Response<VatId>, Error>>>>,
2090    connection_state: Rc<ConnectionState<VatId>>,
2091
2092    #[allow(dead_code)]
2093    resolve_self_promise: Promise<(), Error>,
2094
2095    promise_clients_to_resolve: RefCell<
2096        crate::sender_queue::SenderQueue<
2097            (Weak<RefCell<PromiseClient<VatId>>>, Vec<PipelineOp>),
2098            (),
2099        >,
2100    >,
2101    resolution_waiters: crate::sender_queue::SenderQueue<(), ()>,
2102}
2103
2104impl<VatId> PipelineState<VatId>
2105where
2106    VatId: 'static,
2107{
2108    fn resolve(state: &Rc<RefCell<Self>>, response: Result<Response<VatId>, Error>) {
2109        let to_resolve = {
2110            let tmp = state.borrow();
2111            let r = tmp.promise_clients_to_resolve.borrow_mut().drain();
2112            r
2113        };
2114        for ((c, ops), _) in to_resolve {
2115            let resolved = match response.clone() {
2116                Ok(v) => match v.get() {
2117                    Ok(x) => x.get_pipelined_cap(&ops),
2118                    Err(e) => Err(e),
2119                },
2120                Err(e) => Err(e),
2121            };
2122            if let Some(c) = c.upgrade() {
2123                c.borrow_mut().resolve(resolved);
2124            }
2125        }
2126
2127        let new_variant = match response {
2128            Ok(r) => PipelineVariant::Resolved(r),
2129            Err(e) => PipelineVariant::Broken(e),
2130        };
2131        let _old_variant = mem::replace(&mut state.borrow_mut().variant, new_variant);
2132
2133        let waiters = state.borrow_mut().resolution_waiters.drain();
2134        for (_, waiter) in waiters {
2135            let _ = waiter.send(());
2136        }
2137    }
2138}
2139
2140struct Pipeline<VatId>
2141where
2142    VatId: 'static,
2143{
2144    state: Rc<RefCell<PipelineState<VatId>>>,
2145}
2146
2147impl<VatId> Pipeline<VatId> {
2148    fn new(
2149        connection_state: &Rc<ConnectionState<VatId>>,
2150        question_ref: Rc<RefCell<QuestionRef<VatId>>>,
2151        redirect_later: Option<Promise<Response<VatId>, ::capnp::Error>>,
2152    ) -> Self {
2153        let state = Rc::new(RefCell::new(PipelineState {
2154            variant: PipelineVariant::Waiting(question_ref),
2155            connection_state: connection_state.clone(),
2156            redirect_later: None,
2157            resolve_self_promise: Promise::from_future(future::pending()),
2158            promise_clients_to_resolve: RefCell::new(crate::sender_queue::SenderQueue::new()),
2159            resolution_waiters: crate::sender_queue::SenderQueue::new(),
2160        }));
2161        if let Some(redirect_later_promise) = redirect_later {
2162            let fork = redirect_later_promise.shared();
2163            let this = Rc::downgrade(&state);
2164            let resolve_self_promise =
2165                connection_state.eagerly_evaluate(fork.clone().then(move |response| {
2166                    let Some(state) = this.upgrade() else {
2167                        return Promise::err(Error::failed("dangling reference to this".into()));
2168                    };
2169                    PipelineState::resolve(&state, response);
2170                    Promise::ok(())
2171                }));
2172
2173            state.borrow_mut().resolve_self_promise = resolve_self_promise;
2174            state.borrow_mut().redirect_later = Some(RefCell::new(fork));
2175        }
2176        Self { state }
2177    }
2178
2179    fn when_resolved(&self) -> Promise<(), Error> {
2180        self.state.borrow_mut().resolution_waiters.push(())
2181    }
2182
2183    fn never_done(
2184        connection_state: Rc<ConnectionState<VatId>>,
2185        question_ref: Rc<RefCell<QuestionRef<VatId>>>,
2186    ) -> Self {
2187        let state = Rc::new(RefCell::new(PipelineState {
2188            variant: PipelineVariant::Waiting(question_ref),
2189            connection_state,
2190            redirect_later: None,
2191            resolve_self_promise: Promise::from_future(future::pending()),
2192            promise_clients_to_resolve: RefCell::new(crate::sender_queue::SenderQueue::new()),
2193            resolution_waiters: crate::sender_queue::SenderQueue::new(),
2194        }));
2195
2196        Self { state }
2197    }
2198}
2199
2200impl<VatId> PipelineHook for Pipeline<VatId> {
2201    fn add_ref(&self) -> Box<dyn PipelineHook> {
2202        Box::new(Self {
2203            state: self.state.clone(),
2204        })
2205    }
2206    fn get_pipelined_cap(&self, ops: &[PipelineOp]) -> Box<dyn ClientHook> {
2207        self.get_pipelined_cap_move(ops.into())
2208    }
2209    fn get_pipelined_cap_move(&self, ops: Vec<PipelineOp>) -> Box<dyn ClientHook> {
2210        match *self.state.borrow() {
2211            PipelineState {
2212                variant: PipelineVariant::Waiting(ref question_ref),
2213                ref connection_state,
2214                ref redirect_later,
2215                ref promise_clients_to_resolve,
2216                ..
2217            } => {
2218                // Wrap a PipelineClient in a PromiseClient.
2219                let pipeline_client =
2220                    PipelineClient::new(connection_state, question_ref.clone(), ops.clone());
2221
2222                match redirect_later {
2223                    Some(_r) => {
2224                        let client: Client<VatId> = pipeline_client.into();
2225                        let promise_client =
2226                            PromiseClient::new(connection_state, Box::new(client), None);
2227                        promise_clients_to_resolve
2228                            .borrow_mut()
2229                            .push_detach((Rc::downgrade(&promise_client), ops));
2230                        let result: Client<VatId> = promise_client.into();
2231                        Box::new(result)
2232                    }
2233                    None => {
2234                        // Oh, this pipeline will never get redirected, so just return the PipelineClient.
2235                        let client: Client<VatId> = pipeline_client.into();
2236                        Box::new(client)
2237                    }
2238                }
2239            }
2240            PipelineState {
2241                variant: PipelineVariant::Resolved(ref response),
2242                ..
2243            } => response.get().unwrap().get_pipelined_cap(&ops[..]).unwrap(),
2244            PipelineState {
2245                variant: PipelineVariant::Broken(ref e),
2246                ..
2247            } => broken::new_cap(e.clone()),
2248        }
2249    }
2250}
2251
2252pub struct Params {
2253    request: Box<dyn crate::IncomingMessage>,
2254    cap_table: Vec<Option<Box<dyn ClientHook>>>,
2255}
2256
2257impl Params {
2258    fn new(
2259        request: Box<dyn crate::IncomingMessage>,
2260        cap_table: Vec<Option<Box<dyn ClientHook>>>,
2261    ) -> Self {
2262        Self { request, cap_table }
2263    }
2264}
2265
2266impl ParamsHook for Params {
2267    fn get(&self) -> ::capnp::Result<any_pointer::Reader<'_>> {
2268        let root: message::Reader = self.request.get_body()?.get_as()?;
2269        let message::Call(call) = root.which()? else {
2270            unreachable!()
2271        };
2272        use ::capnp::traits::Imbue;
2273        let mut content = call?.get_params()?.get_content();
2274        content.imbue(&self.cap_table);
2275        Ok(content)
2276    }
2277}
2278
2279enum ResultsVariant {
2280    Rpc(
2281        Box<dyn crate::OutgoingMessage>,
2282        Vec<Option<Box<dyn ClientHook>>>,
2283    ),
2284    LocallyRedirected(
2285        ::capnp::message::Builder<::capnp::message::HeapAllocator>,
2286        Vec<Option<Box<dyn ClientHook>>>,
2287    ),
2288}
2289
2290struct ResultsInner<VatId>
2291where
2292    VatId: 'static,
2293{
2294    connection_state: Rc<ConnectionState<VatId>>,
2295    variant: Option<ResultsVariant>,
2296    redirect_results: bool,
2297    answer_id: AnswerId,
2298    finish_received: Rc<Cell<bool>>,
2299    pipeline_sender: Option<queued::PipelineInnerSender>,
2300}
2301
2302impl<VatId> ResultsInner<VatId>
2303where
2304    VatId: 'static,
2305{
2306    fn ensure_initialized(&mut self) {
2307        let answer_id = self.answer_id;
2308        if self.variant.is_none() {
2309            match (
2310                self.redirect_results,
2311                self.connection_state.connection.borrow_mut().as_mut(),
2312            ) {
2313                (false, Ok(c)) => {
2314                    let mut message = c.new_outgoing_message(100); // size hint?
2315
2316                    {
2317                        let root: message::Builder = message.get_body().unwrap().init_as();
2318                        let mut ret = root.init_return();
2319                        ret.set_answer_id(answer_id);
2320                        ret.set_release_param_caps(false);
2321                    }
2322                    self.variant = Some(ResultsVariant::Rpc(message, Vec::new()));
2323                }
2324                _ => {
2325                    self.variant = Some(ResultsVariant::LocallyRedirected(
2326                        ::capnp::message::Builder::new_default(),
2327                        Vec::new(),
2328                    ));
2329                }
2330            }
2331        }
2332    }
2333}
2334
2335// This takes the place of both RpcCallContext and RpcServerResponse in capnproto-c++.
2336pub struct Results<VatId>
2337where
2338    VatId: 'static,
2339{
2340    inner: Option<ResultsInner<VatId>>,
2341    results_done_fulfiller: Option<oneshot::Sender<ResultsInner<VatId>>>,
2342}
2343
2344impl<VatId> Results<VatId>
2345where
2346    VatId: 'static,
2347{
2348    fn new(
2349        connection_state: &Rc<ConnectionState<VatId>>,
2350        answer_id: AnswerId,
2351        redirect_results: bool,
2352        fulfiller: oneshot::Sender<ResultsInner<VatId>>,
2353        finish_received: Rc<Cell<bool>>,
2354        pipeline_sender: Option<queued::PipelineInnerSender>,
2355    ) -> Self {
2356        Self {
2357            inner: Some(ResultsInner {
2358                variant: None,
2359                connection_state: connection_state.clone(),
2360                redirect_results,
2361                answer_id,
2362                finish_received,
2363                pipeline_sender,
2364            }),
2365            results_done_fulfiller: Some(fulfiller),
2366        }
2367    }
2368}
2369
2370impl<VatId> Drop for Results<VatId> {
2371    fn drop(&mut self) {
2372        match (self.inner.take(), self.results_done_fulfiller.take()) {
2373            (Some(inner), Some(fulfiller)) => {
2374                let _ = fulfiller.send(inner);
2375            }
2376            (None, None) => (),
2377            _ => unreachable!(),
2378        }
2379    }
2380}
2381
2382impl<VatId> ResultsHook for Results<VatId> {
2383    fn get(&mut self) -> ::capnp::Result<any_pointer::Builder<'_>> {
2384        use ::capnp::traits::ImbueMut;
2385        let Some(ref mut inner) = self.inner else {
2386            unreachable!();
2387        };
2388        inner.ensure_initialized();
2389        match inner.variant {
2390            None => unreachable!(),
2391            Some(ResultsVariant::Rpc(ref mut message, ref mut cap_table)) => {
2392                let root: message::Builder = message.get_body()?.get_as()?;
2393                let message::Return(ret) = root.which()? else {
2394                    unreachable!();
2395                };
2396                let return_::Results(payload) = ret?.which()? else {
2397                    unreachable!()
2398                };
2399                let mut content = payload?.get_content();
2400                content.imbue_mut(cap_table);
2401                Ok(content)
2402            }
2403            Some(ResultsVariant::LocallyRedirected(ref mut message, ref mut cap_table)) => {
2404                let mut result: any_pointer::Builder = message.get_root()?;
2405                result.imbue_mut(cap_table);
2406                Ok(result)
2407            }
2408        }
2409    }
2410
2411    fn set_pipeline(&mut self) -> ::capnp::Result<()> {
2412        use ::capnp::traits::ImbueMut;
2413        let root = self.get()?;
2414        let size = root.target_size()?;
2415        let mut message2 = capnp::message::Builder::new(
2416            capnp::message::HeapAllocator::new().first_segment_words(size.word_count as u32 + 1),
2417        );
2418        let mut root2: capnp::any_pointer::Builder = message2.init_root();
2419        let mut cap_table2 = vec![];
2420        root2.imbue_mut(&mut cap_table2);
2421        root2.set_as(root.into_reader())?;
2422        let hook =
2423            Box::new(local::ResultsDone::new(message2, cap_table2)) as Box<dyn ResultsDoneHook>;
2424        let Some(ref mut inner) = self.inner else {
2425            unreachable!();
2426        };
2427        let Some(sender) = inner.pipeline_sender.take() else {
2428            return Err(Error::failed("set_pipeline() called twice".into()));
2429        };
2430        sender.complete(Box::new(local::Pipeline::new(hook)));
2431        Ok(())
2432    }
2433
2434    fn tail_call(self: Box<Self>, _request: Box<dyn RequestHook>) -> Promise<(), Error> {
2435        unimplemented!()
2436    }
2437
2438    fn direct_tail_call(
2439        mut self: Box<Self>,
2440        request: Box<dyn RequestHook>,
2441    ) -> (Promise<(), Error>, Box<dyn PipelineHook>) {
2442        if let (Some(inner), Some(fulfiller)) =
2443            (self.inner.take(), self.results_done_fulfiller.take())
2444        {
2445            let state = inner.connection_state.clone();
2446            if request.get_brand() == state.get_brand() && !inner.redirect_results {
2447                // The tail call is headed towards the peer that called us in the first place, so we can
2448                // optimize out the return trip.
2449                if let Some((question_id, promise, pipeline)) = request.tail_send() {
2450                    let mut message = state.new_outgoing_message(100).expect("no connection?"); // size hint?
2451
2452                    {
2453                        let root: message::Builder = message.get_body().unwrap().init_as();
2454                        let mut ret = root.init_return();
2455                        ret.set_answer_id(inner.answer_id);
2456                        ret.set_release_param_caps(false);
2457                        ret.set_take_from_other_question(question_id);
2458                    }
2459                    let _ = message.send();
2460
2461                    // TODO cleanupanswertable
2462
2463                    let _ = fulfiller.send(inner); // ??
2464                    return (promise, pipeline);
2465                }
2466                unimplemented!()
2467            } else {
2468                unimplemented!()
2469            }
2470        } else {
2471            unreachable!();
2472        }
2473    }
2474
2475    fn allow_cancellation(&self) {
2476        unimplemented!()
2477    }
2478}
2479
2480enum ResultsDoneVariant {
2481    Rpc(
2482        Rc<::capnp::message::Builder<::capnp::message::HeapAllocator>>,
2483        Vec<Option<Box<dyn ClientHook>>>,
2484    ),
2485    LocallyRedirected(
2486        ::capnp::message::Builder<::capnp::message::HeapAllocator>,
2487        Vec<Option<Box<dyn ClientHook>>>,
2488    ),
2489}
2490
2491struct ResultsDone {
2492    inner: Rc<ResultsDoneVariant>,
2493}
2494
2495impl ResultsDone {
2496    fn from_results_inner<VatId>(
2497        results_inner: Result<ResultsInner<VatId>, Error>,
2498        call_status: Result<(), Error>,
2499        pipeline_sender: queued::PipelineInnerSender,
2500    ) -> Result<Box<dyn ResultsDoneHook>, Error>
2501    where
2502        VatId: 'static,
2503    {
2504        match results_inner {
2505            Err(e) => {
2506                pipeline_sender.complete(Box::new(crate::broken::Pipeline::new(e.clone())));
2507                Err(e)
2508            }
2509            Ok(mut results_inner) => {
2510                results_inner.ensure_initialized();
2511                let ResultsInner {
2512                    connection_state,
2513                    variant,
2514                    answer_id,
2515                    finish_received,
2516                    ..
2517                } = results_inner;
2518                match variant {
2519                    None => unreachable!(),
2520                    Some(ResultsVariant::Rpc(mut message, cap_table)) => {
2521                        match (finish_received.get(), call_status) {
2522                            (true, _) => {
2523                                let hook = Box::new(Self::rpc(Rc::new(message.take()), cap_table))
2524                                    as Box<dyn ResultsDoneHook>;
2525                                pipeline_sender
2526                                    .complete(Box::new(local::Pipeline::new(hook.clone())));
2527
2528                                // Send a Canceled return.
2529                                if let Ok(connection) =
2530                                    connection_state.connection.borrow_mut().as_mut()
2531                                {
2532                                    let mut message = connection.new_outgoing_message(10);
2533                                    {
2534                                        let root: message::Builder =
2535                                            message.get_body()?.get_as()?;
2536                                        let mut ret = root.init_return();
2537                                        ret.set_answer_id(answer_id);
2538                                        ret.set_release_param_caps(false);
2539                                        ret.set_canceled(());
2540                                    }
2541                                    let _ = message.send();
2542                                }
2543
2544                                connection_state.answer_has_sent_return(answer_id, Vec::new());
2545                                Ok(hook)
2546                            }
2547                            (false, Ok(())) => {
2548                                let exports = {
2549                                    let root: message::Builder = message.get_body()?.get_as()?;
2550                                    let message::Return(Ok(mut ret)) = root.which()? else {
2551                                        unreachable!()
2552                                    };
2553                                    if cap_table.is_empty() {
2554                                        ret.set_no_finish_needed(true);
2555                                        finish_received.set(true);
2556                                    }
2557                                    let crate::rpc_capnp::return_::Results(Ok(payload)) =
2558                                        ret.which()?
2559                                    else {
2560                                        unreachable!()
2561                                    };
2562                                    ConnectionState::write_descriptors(
2563                                        &connection_state,
2564                                        &cap_table,
2565                                        payload,
2566                                    )
2567                                };
2568
2569                                let (_promise, m) = message.send();
2570                                connection_state.answer_has_sent_return(answer_id, exports);
2571                                let hook =
2572                                    Box::new(Self::rpc(m, cap_table)) as Box<dyn ResultsDoneHook>;
2573                                pipeline_sender
2574                                    .complete(Box::new(local::Pipeline::new(hook.clone())));
2575                                Ok(hook)
2576                            }
2577                            (false, Err(e)) => {
2578                                // Send an error return.
2579                                if let Ok(connection) =
2580                                    connection_state.connection.borrow_mut().as_mut()
2581                                {
2582                                    let mut message = connection.new_outgoing_message(50); // XXX size hint
2583                                    {
2584                                        let root: message::Builder =
2585                                            message.get_body()?.get_as()?;
2586                                        let mut ret = root.init_return();
2587                                        ret.set_answer_id(answer_id);
2588                                        ret.set_release_param_caps(false);
2589                                        let mut exc = ret.init_exception();
2590                                        from_error(&e, exc.reborrow());
2591                                    }
2592                                    let _ = message.send();
2593                                }
2594                                connection_state.answer_has_sent_return(answer_id, Vec::new());
2595
2596                                pipeline_sender
2597                                    .complete(Box::new(crate::broken::Pipeline::new(e.clone())));
2598
2599                                Err(e)
2600                            }
2601                        }
2602                    }
2603                    Some(ResultsVariant::LocallyRedirected(results_done, cap_table)) => {
2604                        let hook = Box::new(Self::redirected(results_done, cap_table))
2605                            as Box<dyn ResultsDoneHook>;
2606                        pipeline_sender
2607                            .complete(Box::new(crate::local::Pipeline::new(hook.clone())));
2608                        Ok(hook)
2609                    }
2610                }
2611            }
2612        }
2613    }
2614
2615    fn rpc(
2616        message: Rc<::capnp::message::Builder<::capnp::message::HeapAllocator>>,
2617        cap_table: Vec<Option<Box<dyn ClientHook>>>,
2618    ) -> Self {
2619        Self {
2620            inner: Rc::new(ResultsDoneVariant::Rpc(message, cap_table)),
2621        }
2622    }
2623
2624    fn redirected(
2625        message: ::capnp::message::Builder<::capnp::message::HeapAllocator>,
2626        cap_table: Vec<Option<Box<dyn ClientHook>>>,
2627    ) -> Self {
2628        Self {
2629            inner: Rc::new(ResultsDoneVariant::LocallyRedirected(message, cap_table)),
2630        }
2631    }
2632}
2633
2634impl ResultsDoneHook for ResultsDone {
2635    fn add_ref(&self) -> Box<dyn ResultsDoneHook> {
2636        Box::new(Self {
2637            inner: self.inner.clone(),
2638        })
2639    }
2640    fn get(&self) -> ::capnp::Result<any_pointer::Reader<'_>> {
2641        use ::capnp::traits::Imbue;
2642        match *self.inner {
2643            ResultsDoneVariant::Rpc(ref message, ref cap_table) => {
2644                let root: message::Reader = message.get_root_as_reader()?;
2645                let message::Return(ret) = root.which()? else {
2646                    unreachable!();
2647                };
2648                let crate::rpc_capnp::return_::Results(payload) = ret?.which()? else {
2649                    unreachable!();
2650                };
2651                let mut content = payload?.get_content();
2652                content.imbue(cap_table);
2653                Ok(content)
2654            }
2655            ResultsDoneVariant::LocallyRedirected(ref message, ref cap_table) => {
2656                let mut result: any_pointer::Reader = message.get_root_as_reader()?;
2657                result.imbue(cap_table);
2658                Ok(result)
2659            }
2660        }
2661    }
2662}
2663
2664enum ClientVariant<VatId>
2665where
2666    VatId: 'static,
2667{
2668    Import(Rc<RefCell<ImportClient<VatId>>>),
2669    Pipeline(Rc<RefCell<PipelineClient<VatId>>>),
2670    Promise(Rc<RefCell<PromiseClient<VatId>>>),
2671}
2672
2673struct Client<VatId>
2674where
2675    VatId: 'static,
2676{
2677    connection_state: Rc<ConnectionState<VatId>>,
2678    variant: ClientVariant<VatId>,
2679    flow_controller: Rc<RefCell<Option<Box<dyn crate::FlowController>>>>,
2680}
2681
2682enum WeakClientVariant<VatId>
2683where
2684    VatId: 'static,
2685{
2686    Import(Weak<RefCell<ImportClient<VatId>>>),
2687    Pipeline(Weak<RefCell<PipelineClient<VatId>>>),
2688    Promise(Weak<RefCell<PromiseClient<VatId>>>),
2689}
2690
2691struct WeakClient<VatId>
2692where
2693    VatId: 'static,
2694{
2695    connection_state: Weak<ConnectionState<VatId>>,
2696    variant: WeakClientVariant<VatId>,
2697    flow_controller: Weak<RefCell<Option<Box<dyn crate::FlowController>>>>,
2698}
2699
2700impl<VatId> WeakClient<VatId>
2701where
2702    VatId: 'static,
2703{
2704    fn upgrade(&self) -> Option<Client<VatId>> {
2705        let variant = match &self.variant {
2706            WeakClientVariant::Import(ic) => ClientVariant::Import(ic.upgrade()?),
2707            WeakClientVariant::Pipeline(pc) => ClientVariant::Pipeline(pc.upgrade()?),
2708            WeakClientVariant::Promise(pc) => ClientVariant::Promise(pc.upgrade()?),
2709        };
2710        let connection_state = self.connection_state.upgrade()?;
2711        let flow_controller = self.flow_controller.upgrade()?;
2712        Some(Client {
2713            connection_state,
2714            variant,
2715            flow_controller,
2716        })
2717    }
2718}
2719
2720struct ImportClient<VatId>
2721where
2722    VatId: 'static,
2723{
2724    connection_state: Rc<ConnectionState<VatId>>,
2725    import_id: ImportId,
2726
2727    /// Number of times we've received this import from the peer.
2728    remote_ref_count: u32,
2729}
2730
2731impl<VatId> Drop for ImportClient<VatId> {
2732    fn drop(&mut self) {
2733        let connection_state = self.connection_state.clone();
2734
2735        assert!(connection_state
2736            .client_downcast_map
2737            .borrow_mut()
2738            .remove(&((self) as *const _ as usize))
2739            .is_some());
2740
2741        // Remove the corresponding entry of the imports table.
2742        // Note: the C++ implementation checks here pointer equality between self and
2743        // the entry in the imports table, but as far as I can tell the check should
2744        // always pass because of how we construct ImportClient in import().
2745        connection_state
2746            .imports
2747            .borrow_mut()
2748            .slots
2749            .remove(&self.import_id);
2750
2751        // Send a message releasing our remote references.
2752        let mut tmp = connection_state.connection.borrow_mut();
2753        if let (true, Ok(c)) = (self.remote_ref_count > 0, tmp.as_mut()) {
2754            let mut message = c.new_outgoing_message(10);
2755            {
2756                let root: message::Builder = message.get_body().unwrap().init_as();
2757                let mut release = root.init_release();
2758                release.set_id(self.import_id);
2759                release.set_reference_count(self.remote_ref_count);
2760            }
2761            let _ = message.send();
2762        }
2763    }
2764}
2765
2766impl<VatId> ImportClient<VatId>
2767where
2768    VatId: 'static,
2769{
2770    fn new(
2771        connection_state: &Rc<ConnectionState<VatId>>,
2772        import_id: ImportId,
2773    ) -> Rc<RefCell<Self>> {
2774        Rc::new(RefCell::new(Self {
2775            connection_state: connection_state.clone(),
2776            import_id,
2777            remote_ref_count: 0,
2778        }))
2779    }
2780
2781    fn add_remote_ref(&mut self) {
2782        self.remote_ref_count += 1;
2783    }
2784}
2785
2786impl<VatId> From<Rc<RefCell<ImportClient<VatId>>>> for Client<VatId> {
2787    fn from(client: Rc<RefCell<ImportClient<VatId>>>) -> Self {
2788        let connection_state = client.borrow().connection_state.clone();
2789        Self::new(&connection_state, ClientVariant::Import(client))
2790    }
2791}
2792
2793/// A `ClientHook` representing a pipelined promise.  Always wrapped in `PromiseClient`.
2794struct PipelineClient<VatId>
2795where
2796    VatId: 'static,
2797{
2798    connection_state: Rc<ConnectionState<VatId>>,
2799    question_ref: Rc<RefCell<QuestionRef<VatId>>>,
2800    ops: Vec<PipelineOp>,
2801}
2802
2803impl<VatId> PipelineClient<VatId>
2804where
2805    VatId: 'static,
2806{
2807    fn new(
2808        connection_state: &Rc<ConnectionState<VatId>>,
2809        question_ref: Rc<RefCell<QuestionRef<VatId>>>,
2810        ops: Vec<PipelineOp>,
2811    ) -> Rc<RefCell<Self>> {
2812        Rc::new(RefCell::new(Self {
2813            connection_state: connection_state.clone(),
2814            question_ref,
2815            ops,
2816        }))
2817    }
2818}
2819
2820impl<VatId> From<Rc<RefCell<PipelineClient<VatId>>>> for Client<VatId> {
2821    fn from(client: Rc<RefCell<PipelineClient<VatId>>>) -> Self {
2822        let connection_state = client.borrow().connection_state.clone();
2823        Self::new(&connection_state, ClientVariant::Pipeline(client))
2824    }
2825}
2826
2827impl<VatId> Drop for PipelineClient<VatId> {
2828    fn drop(&mut self) {
2829        assert!(self
2830            .connection_state
2831            .client_downcast_map
2832            .borrow_mut()
2833            .remove(&((self) as *const _ as usize))
2834            .is_some());
2835    }
2836}
2837
2838/// A `ClientHook` that initially wraps one client and then, later on, redirects
2839/// to some other client.
2840struct PromiseClient<VatId>
2841where
2842    VatId: 'static,
2843{
2844    connection_state: Rc<ConnectionState<VatId>>,
2845    is_resolved: bool,
2846    cap: Box<dyn ClientHook>,
2847    import_id: Option<ImportId>,
2848    received_call: bool,
2849    resolution_waiters: crate::sender_queue::SenderQueue<(), Box<dyn ClientHook>>,
2850}
2851
2852impl<VatId> PromiseClient<VatId> {
2853    fn new(
2854        connection_state: &Rc<ConnectionState<VatId>>,
2855        initial: Box<dyn ClientHook>,
2856        import_id: Option<ImportId>,
2857    ) -> Rc<RefCell<Self>> {
2858        Rc::new(RefCell::new(Self {
2859            connection_state: connection_state.clone(),
2860            is_resolved: false,
2861            cap: initial,
2862            import_id,
2863            received_call: false,
2864            resolution_waiters: crate::sender_queue::SenderQueue::new(),
2865        }))
2866    }
2867
2868    fn resolve(&mut self, replacement: Result<Box<dyn ClientHook>, Error>) {
2869        let (mut replacement, is_error) = match replacement {
2870            Ok(v) => (v, false),
2871            Err(e) => (broken::new_cap(e), true),
2872        };
2873        let connection_state = self.connection_state.clone();
2874        let is_connected = connection_state.connection.borrow().is_ok();
2875        let replacement_brand = replacement.get_brand();
2876        if replacement_brand != connection_state.get_brand()
2877            && self.received_call
2878            && !is_error
2879            && is_connected
2880        {
2881            // The new capability is hosted locally, not on the remote machine.  And, we had made calls
2882            // to the promise.  We need to make sure those calls echo back to us before we allow new
2883            // calls to go directly to the local capability, so we need to set a local embargo and send
2884            // a `Disembargo` to echo through the peer.
2885            let (fulfiller, promise) = oneshot::channel::<Result<(), Error>>();
2886            let promise = promise
2887                .map_err(crate::canceled_to_error)
2888                .and_then(future::ready);
2889            let embargo = Embargo::new(fulfiller);
2890            let embargo_id = connection_state.embargoes.borrow_mut().push(embargo);
2891
2892            let mut message = connection_state
2893                .new_outgoing_message(50)
2894                .expect("no connection?"); // XXX size hint
2895            {
2896                let root: message::Builder = message.get_body().unwrap().init_as();
2897                let mut disembargo = root.init_disembargo();
2898                disembargo
2899                    .reborrow()
2900                    .init_context()
2901                    .set_sender_loopback(embargo_id);
2902                let target = disembargo.init_target();
2903
2904                let redirect = connection_state.write_target(&*self.cap, target);
2905                if redirect.is_some() {
2906                    panic!("Original promise target should always be from this RPC connection.")
2907                }
2908            }
2909
2910            // Make a promise which resolves to `replacement` as soon as the `Disembargo` comes back.
2911            let embargo_promise = promise.map_ok(move |()| replacement);
2912
2913            let mut queued_client = queued::Client::new(None);
2914            let weak_queued = Rc::downgrade(&queued_client.inner);
2915
2916            queued_client.drive(embargo_promise.then(move |r| {
2917                if let Some(q) = weak_queued.upgrade() {
2918                    queued::ClientInner::resolve(&q, r);
2919                }
2920                Promise::ok(())
2921            }));
2922
2923            // We need to queue up calls in the meantime, so we'll resolve ourselves to a local promise
2924            // client instead.
2925            replacement = Box::new(queued_client);
2926
2927            let _ = message.send();
2928        }
2929
2930        for ((), waiter) in self.resolution_waiters.drain() {
2931            let _ = waiter.send(replacement.clone());
2932        }
2933
2934        let old_cap = mem::replace(&mut self.cap, replacement);
2935        connection_state.add_task(async move {
2936            drop(old_cap);
2937            Ok(())
2938        });
2939
2940        self.is_resolved = true;
2941    }
2942}
2943
2944impl<VatId> Drop for PromiseClient<VatId> {
2945    fn drop(&mut self) {
2946        let self_ptr = (self) as *const _ as usize;
2947
2948        if let Some(id) = self.import_id {
2949            // This object is representing an import promise.  That means the import table may still
2950            // contain a pointer back to it.  Remove that pointer.  Note that we have to verify that
2951            // the import still exists and the pointer still points back to this object because this
2952            // object may actually outlive the import.
2953            let slots = &mut self.connection_state.imports.borrow_mut().slots;
2954            if let Some(import) = slots.get_mut(&id) {
2955                if let Some(c) = &import.app_client {
2956                    if let Some(cs) = c.upgrade() {
2957                        if cs.get_ptr() == self_ptr {
2958                            import.app_client = None;
2959                        }
2960                    }
2961                }
2962            }
2963        }
2964
2965        assert!(self
2966            .connection_state
2967            .client_downcast_map
2968            .borrow_mut()
2969            .remove(&self_ptr)
2970            .is_some());
2971    }
2972}
2973
2974impl<VatId> From<Rc<RefCell<PromiseClient<VatId>>>> for Client<VatId> {
2975    fn from(client: Rc<RefCell<PromiseClient<VatId>>>) -> Self {
2976        let connection_state = client.borrow().connection_state.clone();
2977        Self::new(&connection_state, ClientVariant::Promise(client))
2978    }
2979}
2980
2981impl<VatId> Client<VatId> {
2982    fn new(connection_state: &Rc<ConnectionState<VatId>>, variant: ClientVariant<VatId>) -> Self {
2983        let client = Self {
2984            connection_state: connection_state.clone(),
2985            variant,
2986            flow_controller: Rc::new(RefCell::new(None)),
2987        };
2988        let weak = client.downgrade();
2989
2990        // XXX arguably, this should go in each of the variant's constructors.
2991        connection_state
2992            .client_downcast_map
2993            .borrow_mut()
2994            .insert(client.get_ptr(), weak);
2995        client
2996    }
2997    fn downgrade(&self) -> WeakClient<VatId> {
2998        let variant = match &self.variant {
2999            ClientVariant::Import(import_client) => {
3000                WeakClientVariant::Import(Rc::downgrade(import_client))
3001            }
3002            ClientVariant::Pipeline(pipeline_client) => {
3003                WeakClientVariant::Pipeline(Rc::downgrade(pipeline_client))
3004            }
3005            ClientVariant::Promise(promise_client) => {
3006                WeakClientVariant::Promise(Rc::downgrade(promise_client))
3007            }
3008        };
3009        WeakClient {
3010            connection_state: Rc::downgrade(&self.connection_state),
3011            variant,
3012            flow_controller: Rc::downgrade(&self.flow_controller),
3013        }
3014    }
3015
3016    fn from_ptr(ptr: usize, connection_state: &ConnectionState<VatId>) -> Option<Self> {
3017        match connection_state.client_downcast_map.borrow().get(&ptr) {
3018            Some(c) => c.upgrade(),
3019            None => None,
3020        }
3021    }
3022
3023    fn write_target(
3024        &self,
3025        mut target: crate::rpc_capnp::message_target::Builder,
3026    ) -> Option<Box<dyn ClientHook>> {
3027        match &self.variant {
3028            ClientVariant::Import(import_client) => {
3029                target.set_imported_cap(import_client.borrow().import_id);
3030                None
3031            }
3032            ClientVariant::Pipeline(pipeline_client) => {
3033                let mut builder = target.init_promised_answer();
3034                let question_ref = &pipeline_client.borrow().question_ref;
3035                builder.set_question_id(question_ref.borrow().id);
3036                let mut transform =
3037                    builder.init_transform(pipeline_client.borrow().ops.len() as u32);
3038                for idx in 0..pipeline_client.borrow().ops.len() {
3039                    if let ::capnp::private::capability::PipelineOp::GetPointerField(ordinal) =
3040                        pipeline_client.borrow().ops[idx]
3041                    {
3042                        transform
3043                            .reborrow()
3044                            .get(idx as u32)
3045                            .set_get_pointer_field(ordinal);
3046                    }
3047                }
3048                None
3049            }
3050            ClientVariant::Promise(promise_client) => {
3051                promise_client.borrow_mut().received_call = true;
3052                self.connection_state
3053                    .write_target(&*promise_client.borrow().cap, target)
3054            }
3055        }
3056    }
3057
3058    fn write_descriptor(&self, mut descriptor: cap_descriptor::Builder) -> Option<u32> {
3059        match &self.variant {
3060            ClientVariant::Import(import_client) => {
3061                descriptor.set_receiver_hosted(import_client.borrow().import_id);
3062                None
3063            }
3064            ClientVariant::Pipeline(pipeline_client) => {
3065                let mut promised_answer = descriptor.init_receiver_answer();
3066                let question_ref = &pipeline_client.borrow().question_ref;
3067                promised_answer.set_question_id(question_ref.borrow().id);
3068                let mut transform =
3069                    promised_answer.init_transform(pipeline_client.borrow().ops.len() as u32);
3070                for idx in 0..pipeline_client.borrow().ops.len() {
3071                    if let ::capnp::private::capability::PipelineOp::GetPointerField(ordinal) =
3072                        pipeline_client.borrow().ops[idx]
3073                    {
3074                        transform
3075                            .reborrow()
3076                            .get(idx as u32)
3077                            .set_get_pointer_field(ordinal);
3078                    }
3079                }
3080
3081                None
3082            }
3083            ClientVariant::Promise(promise_client) => {
3084                promise_client.borrow_mut().received_call = true;
3085
3086                ConnectionState::write_descriptor(
3087                    &self.connection_state.clone(),
3088                    promise_client.borrow().cap.clone(),
3089                    descriptor,
3090                )
3091                .unwrap()
3092            }
3093        }
3094    }
3095}
3096
3097impl<VatId> Clone for Client<VatId> {
3098    fn clone(&self) -> Self {
3099        let variant = match &self.variant {
3100            ClientVariant::Import(import_client) => ClientVariant::Import(import_client.clone()),
3101            ClientVariant::Pipeline(pipeline_client) => {
3102                ClientVariant::Pipeline(pipeline_client.clone())
3103            }
3104            ClientVariant::Promise(promise_client) => {
3105                ClientVariant::Promise(promise_client.clone())
3106            }
3107        };
3108        Self {
3109            connection_state: self.connection_state.clone(),
3110            variant,
3111            flow_controller: self.flow_controller.clone(),
3112        }
3113    }
3114}
3115
3116impl<VatId> ClientHook for Client<VatId> {
3117    fn add_ref(&self) -> Box<dyn ClientHook> {
3118        Box::new(self.clone())
3119    }
3120    fn new_call(
3121        &self,
3122        interface_id: u64,
3123        method_id: u16,
3124        size_hint: Option<::capnp::MessageSize>,
3125    ) -> ::capnp::capability::Request<any_pointer::Owned, any_pointer::Owned> {
3126        let request: Box<dyn RequestHook> =
3127            match Request::new(self.connection_state.clone(), size_hint, self.clone()) {
3128                Ok(mut request) => {
3129                    {
3130                        let mut call_builder = request.init_call();
3131                        call_builder.set_interface_id(interface_id);
3132                        call_builder.set_method_id(method_id);
3133                    }
3134                    Box::new(request)
3135                }
3136                Err(e) => Box::new(broken::Request::new(e, None)),
3137            };
3138
3139        ::capnp::capability::Request::new(request)
3140    }
3141
3142    fn call(
3143        &self,
3144        interface_id: u64,
3145        method_id: u16,
3146        params: Box<dyn ParamsHook>,
3147        mut results: Box<dyn ResultsHook>,
3148    ) -> Promise<(), Error> {
3149        // Implement call() by copying params and results messages.
3150
3151        let maybe_request = params.get().and_then(|p| {
3152            let mut request = p
3153                .target_size()
3154                .map(|s| self.new_call(interface_id, method_id, Some(s)))?;
3155            request.get().set_as(p)?;
3156            Ok(request)
3157        });
3158
3159        match maybe_request {
3160            Err(e) => Promise::err(e),
3161            Ok(request) => {
3162                let ::capnp::capability::RemotePromise { promise, .. } = request.send();
3163
3164                Promise::from_future(async move {
3165                    let response = promise.await?;
3166                    results.get()?.set_as(response.get()?)?;
3167                    Ok(())
3168                })
3169            }
3170        }
3171        // TODO implement this in terms of direct tail call.
3172        // We can and should propagate cancellation.
3173        // (TODO ?)
3174        // context -> allowCancellation();
3175
3176        //results.direct_tail_call(request.hook)
3177    }
3178
3179    fn get_ptr(&self) -> usize {
3180        match &self.variant {
3181            ClientVariant::Import(import_client) => (&*import_client.borrow()) as *const _ as usize,
3182            ClientVariant::Pipeline(pipeline_client) => {
3183                (&*pipeline_client.borrow()) as *const _ as usize
3184            }
3185            ClientVariant::Promise(promise_client) => {
3186                (&*promise_client.borrow()) as *const _ as usize
3187            }
3188        }
3189    }
3190
3191    fn get_brand(&self) -> usize {
3192        self.connection_state.get_brand()
3193    }
3194
3195    fn get_resolved(&self) -> Option<Box<dyn ClientHook>> {
3196        match &self.variant {
3197            ClientVariant::Import(_import_client) => None,
3198            ClientVariant::Pipeline(_pipeline_client) => None,
3199            ClientVariant::Promise(promise_client) => {
3200                if promise_client.borrow().is_resolved {
3201                    Some(promise_client.borrow().cap.clone())
3202                } else {
3203                    None
3204                }
3205            }
3206        }
3207    }
3208
3209    fn when_more_resolved(&self) -> Option<Promise<Box<dyn ClientHook>, Error>> {
3210        match &self.variant {
3211            ClientVariant::Import(_import_client) => None,
3212            ClientVariant::Pipeline(_pipeline_client) => None,
3213            ClientVariant::Promise(promise_client) => {
3214                Some(promise_client.borrow_mut().resolution_waiters.push(()))
3215            }
3216        }
3217    }
3218
3219    fn when_resolved(&self) -> Promise<(), Error> {
3220        default_when_resolved_impl(self)
3221    }
3222}
3223
3224pub(crate) fn default_when_resolved_impl<C>(client: &C) -> Promise<(), Error>
3225where
3226    C: ClientHook,
3227{
3228    match client.when_more_resolved() {
3229        Some(promise) => {
3230            Promise::from_future(promise.and_then(|resolution| resolution.when_resolved()))
3231        }
3232        None => Promise::ok(()),
3233    }
3234}
3235
3236// ===================================
3237
3238struct SingleCapPipeline {
3239    cap: Box<dyn ClientHook>,
3240}
3241
3242impl SingleCapPipeline {
3243    fn new(cap: Box<dyn ClientHook>) -> Self {
3244        Self { cap }
3245    }
3246}
3247
3248impl PipelineHook for SingleCapPipeline {
3249    fn add_ref(&self) -> Box<dyn PipelineHook> {
3250        Box::new(Self {
3251            cap: self.cap.clone(),
3252        })
3253    }
3254    fn get_pipelined_cap(&self, ops: &[PipelineOp]) -> Box<dyn ClientHook> {
3255        if ops.is_empty() {
3256            self.cap.add_ref()
3257        } else {
3258            broken::new_cap(Error::failed("Invalid pipeline transform.".to_string()))
3259        }
3260    }
3261}