1use std::pin::Pin;
23use std::task::{Context, Poll};
24
25use capnp::any_pointer;
26use capnp::capability::Promise;
27use capnp::private::capability::{
28 ClientHook, ParamsHook, PipelineHook, PipelineOp, RequestHook, ResponseHook, ResultsHook,
29};
30use capnp::Error;
31
32use futures::channel::oneshot;
33use futures::{future, Future, FutureExt, TryFutureExt};
34
35use std::cell::{Cell, RefCell};
36use std::cmp::Reverse;
37use std::collections::binary_heap::BinaryHeap;
38use std::collections::hash_map::{self, HashMap};
39use std::mem;
40use std::rc::{Rc, Weak};
41
42use crate::attach::Attach;
43use crate::local::ResultsDoneHook;
44use crate::rpc_capnp::{
45 bootstrap, call, cap_descriptor, disembargo, exception, finish, message, message_target,
46 payload, promised_answer, resolve, return_,
47};
48use crate::task_set::TaskSet;
49use crate::{broken, local, queued};
50
51pub type QuestionId = u32;
52pub type AnswerId = QuestionId;
53pub type ExportId = u32;
54pub type ImportId = ExportId;
55
56pub struct ImportTable<T> {
57 slots: HashMap<u32, T>,
58}
59
60impl<T> ImportTable<T> {
61 pub fn new() -> Self {
62 Self {
63 slots: HashMap::new(),
64 }
65 }
66}
67
68struct ExportTable<T> {
69 slots: Vec<Option<T>>,
70
71 free_ids: BinaryHeap<Reverse<u32>>,
73}
74
75struct ExportTableIter<'a, T>
76where
77 T: 'a,
78{
79 table: &'a ExportTable<T>,
80 idx: usize,
81}
82
83impl<'a, T> ::std::iter::Iterator for ExportTableIter<'a, T>
84where
85 T: 'a,
86{
87 type Item = &'a T;
88 fn next(&mut self) -> Option<&'a T> {
89 while self.idx < self.table.slots.len() {
90 let idx = self.idx;
91 self.idx += 1;
92 if let Some(v) = &self.table.slots[idx] {
93 return Some(v);
94 }
95 }
96 None
97 }
98}
99
100impl<T> ExportTable<T> {
101 pub fn new() -> Self {
102 Self {
103 slots: Vec::new(),
104 free_ids: BinaryHeap::new(),
105 }
106 }
107
108 pub fn erase(&mut self, id: u32) {
109 self.slots[id as usize] = None;
110 self.free_ids.push(Reverse(id));
111 }
112
113 pub fn push(&mut self, val: T) -> u32 {
114 match self.free_ids.pop() {
115 Some(Reverse(id)) => {
116 self.slots[id as usize] = Some(val);
117 id
118 }
119 None => {
120 self.slots.push(Some(val));
121 self.slots.len() as u32 - 1
122 }
123 }
124 }
125
126 pub fn find(&mut self, id: u32) -> Option<&mut T> {
127 let idx = id as usize;
128 if idx < self.slots.len() {
129 self.slots[idx].as_mut()
130 } else {
131 None
132 }
133 }
134
135 pub fn iter(&self) -> ExportTableIter<'_, T> {
136 ExportTableIter {
137 table: self,
138 idx: 0,
139 }
140 }
141}
142
143struct Question<VatId>
144where
145 VatId: 'static,
146{
147 is_awaiting_return: bool,
148
149 #[allow(dead_code)]
150 param_exports: Vec<ExportId>,
151
152 #[allow(dead_code)]
153 is_tail_call: bool,
154
155 self_ref: Option<Weak<RefCell<QuestionRef<VatId>>>>,
157
158 skip_finish: bool,
160}
161
162impl<VatId> Question<VatId> {
163 fn new() -> Self {
164 Self {
165 is_awaiting_return: true,
166 param_exports: Vec::new(),
167 is_tail_call: false,
168 self_ref: None,
169 skip_finish: false,
170 }
171 }
172}
173
174struct QuestionRef<VatId>
177where
178 VatId: 'static,
179{
180 connection_state: Rc<ConnectionState<VatId>>,
181 id: QuestionId,
182 fulfiller: Option<oneshot::Sender<Promise<Response<VatId>, Error>>>,
183}
184
185impl<VatId> QuestionRef<VatId> {
186 fn new(
187 state: Rc<ConnectionState<VatId>>,
188 id: QuestionId,
189 fulfiller: oneshot::Sender<Promise<Response<VatId>, Error>>,
190 ) -> Self {
191 Self {
192 connection_state: state,
193 id,
194 fulfiller: Some(fulfiller),
195 }
196 }
197 fn fulfill(&mut self, response: Promise<Response<VatId>, Error>) {
198 if let Some(fulfiller) = self.fulfiller.take() {
199 let _ = fulfiller.send(response);
200 }
201 }
202
203 fn reject(&mut self, err: Error) {
204 if let Some(fulfiller) = self.fulfiller.take() {
205 let _ = fulfiller.send(Promise::err(err));
206 }
207 }
208}
209
210impl<VatId> Drop for QuestionRef<VatId> {
211 fn drop(&mut self) {
212 let mut questions = self.connection_state.questions.borrow_mut();
213 let Some(q) = &mut questions.slots[self.id as usize] else {
214 unreachable!()
215 };
216 if let Ok(ref mut c) = *self.connection_state.connection.borrow_mut() {
217 if !q.skip_finish {
218 let mut message = c.new_outgoing_message(5);
219 {
220 let root: message::Builder = message.get_body().unwrap().init_as();
221 let mut builder = root.init_finish();
222 builder.set_question_id(self.id);
223
224 builder.set_release_result_caps(q.is_awaiting_return);
230 }
231 let _ = message.send();
232 }
233 }
234
235 if q.is_awaiting_return {
236 q.self_ref = None;
238 } else {
239 questions.erase(self.id)
241 }
242 }
243}
244
245struct Answer<VatId>
246where
247 VatId: 'static,
248{
249 return_has_been_sent: bool,
250
251 pipeline: Option<Box<dyn PipelineHook>>,
253
254 redirected_results: Option<Promise<Response<VatId>, Error>>,
257
258 received_finish: Rc<Cell<bool>>,
259 call_completion_promise: Option<Promise<(), Error>>,
260
261 result_exports: Vec<ExportId>,
264}
265
266impl<VatId> Answer<VatId> {
267 fn new() -> Self {
268 Self {
269 return_has_been_sent: false,
270 pipeline: None,
271 redirected_results: None,
272 received_finish: Rc::new(Cell::new(false)),
273 call_completion_promise: None,
274 result_exports: Vec::new(),
275 }
276 }
277}
278
279pub struct Export {
280 refcount: u32,
281
282 canonical: bool,
285
286 client_hook: Box<dyn ClientHook>,
287
288 resolve_op: Promise<(), Error>,
291}
292
293impl Export {
294 fn new(client_hook: Box<dyn ClientHook>) -> Self {
295 Self {
296 refcount: 1,
297 canonical: false,
298 client_hook,
299 resolve_op: Promise::err(Error::failed("no resolve op".to_string())),
300 }
301 }
302}
303
304pub struct Import<VatId>
305where
306 VatId: 'static,
307{
308 import_client: Weak<RefCell<ImportClient<VatId>>>,
309
310 app_client: Option<WeakClient<VatId>>,
314
315 promise_client_to_resolve: Option<Weak<RefCell<PromiseClient<VatId>>>>,
317}
318
319impl<VatId> Import<VatId> {
320 fn new(import_client: &Rc<RefCell<ImportClient<VatId>>>) -> Self {
321 Self {
322 import_client: Rc::downgrade(import_client),
323 app_client: None,
324 promise_client_to_resolve: None,
325 }
326 }
327}
328
329struct Embargo {
330 fulfiller: Option<oneshot::Sender<Result<(), Error>>>,
331}
332
333impl Embargo {
334 fn new(fulfiller: oneshot::Sender<Result<(), Error>>) -> Self {
335 Self {
336 fulfiller: Some(fulfiller),
337 }
338 }
339}
340
341fn to_pipeline_ops(
342 ops: ::capnp::struct_list::Reader<promised_answer::op::Owned>,
343) -> ::capnp::Result<Vec<PipelineOp>> {
344 let mut result = Vec::new();
345 for op in ops {
346 match op.which()? {
347 promised_answer::op::Noop(()) => {
348 result.push(PipelineOp::Noop);
349 }
350 promised_answer::op::GetPointerField(idx) => {
351 result.push(PipelineOp::GetPointerField(idx));
352 }
353 }
354 }
355 Ok(result)
356}
357
358fn from_error(error: &Error, mut builder: exception::Builder) {
359 let typ = match error.kind {
360 ::capnp::ErrorKind::Failed => exception::Type::Failed,
361 ::capnp::ErrorKind::Overloaded => exception::Type::Overloaded,
362 ::capnp::ErrorKind::Disconnected => exception::Type::Disconnected,
363 ::capnp::ErrorKind::Unimplemented => exception::Type::Unimplemented,
364 ::capnp::ErrorKind::SettingDynamicCapabilitiesIsUnsupported => {
365 exception::Type::Unimplemented
366 }
367 _ => exception::Type::Failed,
368 };
369 builder.set_type(typ);
370 match error.kind {
371 ::capnp::ErrorKind::Failed
372 | ::capnp::ErrorKind::Overloaded
373 | ::capnp::ErrorKind::Disconnected
374 | ::capnp::ErrorKind::Unimplemented => {
375 builder.set_reason(&error.extra);
376 }
377 _ => {
378 builder.set_reason(error.to_string());
382 }
383 }
384}
385
386fn remote_exception_to_error(exception: exception::Reader) -> Error {
387 let (kind, reason) = match (exception.get_type(), exception.get_reason()) {
388 (Ok(exception::Type::Failed), Ok(reason)) => (::capnp::ErrorKind::Failed, reason),
389 (Ok(exception::Type::Overloaded), Ok(reason)) => (::capnp::ErrorKind::Overloaded, reason),
390 (Ok(exception::Type::Disconnected), Ok(reason)) => {
391 (::capnp::ErrorKind::Disconnected, reason)
392 }
393 (Ok(exception::Type::Unimplemented), Ok(reason)) => {
394 (::capnp::ErrorKind::Unimplemented, reason)
395 }
396 _ => (::capnp::ErrorKind::Failed, "(malformed error)".into()),
397 };
398 let reason_str = reason
399 .to_str()
400 .unwrap_or("<malformed utf-8 in error reason>");
401 Error {
402 extra: format!("remote exception: {reason_str}"),
403 kind,
404 }
405}
406
407pub struct ConnectionErrorHandler<VatId>
408where
409 VatId: 'static,
410{
411 weak_state: Weak<ConnectionState<VatId>>,
412}
413
414impl<VatId> ConnectionErrorHandler<VatId> {
415 fn new(weak_state: Weak<ConnectionState<VatId>>) -> Self {
416 Self { weak_state }
417 }
418}
419
420impl<VatId> crate::task_set::TaskReaper<capnp::Error> for ConnectionErrorHandler<VatId> {
421 fn task_failed(&mut self, error: ::capnp::Error) {
422 if let Some(state) = self.weak_state.upgrade() {
423 state.disconnect(error)
424 }
425 }
426}
427
428pub struct ConnectionState<VatId>
429where
430 VatId: 'static,
431{
432 bootstrap_cap: Box<dyn ClientHook>,
433 exports: RefCell<ExportTable<Export>>,
434 questions: RefCell<ExportTable<Question<VatId>>>,
435 answers: RefCell<ImportTable<Answer<VatId>>>,
436 imports: RefCell<ImportTable<Import<VatId>>>,
437
438 exports_by_cap: RefCell<HashMap<usize, ExportId>>,
440
441 embargoes: RefCell<ExportTable<Embargo>>,
442
443 tasks: RefCell<Option<crate::task_set::TaskSetHandle<capnp::Error>>>,
444 connection: RefCell<::std::result::Result<Box<dyn crate::Connection<VatId>>, ::capnp::Error>>,
445 disconnect_fulfiller: RefCell<Option<oneshot::Sender<Promise<(), Error>>>>,
446
447 client_downcast_map: RefCell<HashMap<usize, WeakClient<VatId>>>,
448}
449
450impl<VatId> ConnectionState<VatId> {
451 pub fn new(
452 bootstrap_cap: Box<dyn ClientHook>,
453 connection: Box<dyn crate::Connection<VatId>>,
454 disconnect_fulfiller: oneshot::Sender<Promise<(), Error>>,
455 ) -> (TaskSet<Error>, Rc<Self>) {
456 let state = Rc::new(Self {
457 bootstrap_cap,
458 exports: RefCell::new(ExportTable::new()),
459 questions: RefCell::new(ExportTable::new()),
460 answers: RefCell::new(ImportTable::new()),
461 imports: RefCell::new(ImportTable::new()),
462 exports_by_cap: RefCell::new(HashMap::new()),
463 embargoes: RefCell::new(ExportTable::new()),
464 tasks: RefCell::new(None),
465 connection: RefCell::new(Ok(connection)),
466 disconnect_fulfiller: RefCell::new(Some(disconnect_fulfiller)),
467 client_downcast_map: RefCell::new(HashMap::new()),
468 });
469 let (mut handle, tasks) =
470 TaskSet::new(Box::new(ConnectionErrorHandler::new(Rc::downgrade(&state))));
471
472 handle.add(Self::message_loop(Rc::downgrade(&state)));
473 *state.tasks.borrow_mut() = Some(handle);
474 (tasks, state)
475 }
476
477 fn new_outgoing_message(
478 &self,
479 first_segment_words: u32,
480 ) -> capnp::Result<Box<dyn crate::OutgoingMessage>> {
481 match self.connection.borrow_mut().as_mut() {
482 Err(e) => Err(e.clone()),
483 Ok(c) => Ok(c.new_outgoing_message(first_segment_words)),
484 }
485 }
486
487 fn disconnect(&self, error: ::capnp::Error) {
488 if self.connection.borrow().is_err() {
489 return;
491 }
492
493 let mut pipelines_to_release = Vec::new();
496 let mut clients_to_release = Vec::new();
497 let mut resolve_ops_to_release = Vec::new();
499
500 for q in self.questions.borrow().iter() {
501 if let Some(ref weak_question_ref) = q.self_ref {
502 if let Some(question_ref) = weak_question_ref.upgrade() {
503 question_ref.borrow_mut().reject(error.clone());
504 }
505 }
506 }
507
508 {
509 let answer_slots = &mut self.answers.borrow_mut().slots;
510 for (_, ref mut answer) in answer_slots.iter_mut() {
511 pipelines_to_release.push(answer.pipeline.take())
513 }
514 }
515
516 let len = self.exports.borrow().slots.len();
517 for idx in 0..len {
518 if let Some(exp) = self.exports.borrow_mut().slots[idx].take() {
519 let Export {
520 client_hook,
521 resolve_op,
522 ..
523 } = exp;
524 clients_to_release.push(client_hook);
525 resolve_ops_to_release.push(resolve_op);
526 }
527 }
528 *self.exports.borrow_mut() = ExportTable::new();
529
530 {
531 let import_slots = &mut self.imports.borrow_mut().slots;
532 for (_, ref mut import) in import_slots.iter_mut() {
533 if let Some(f) = import.promise_client_to_resolve.take() {
534 if let Some(promise_client) = f.upgrade() {
535 promise_client.borrow_mut().resolve(Err(error.clone()));
536 }
537 }
538 }
539 }
540
541 let len = self.embargoes.borrow().slots.len();
542 for idx in 0..len {
543 if let Some(ref mut emb) = self.embargoes.borrow_mut().slots[idx] {
544 if let Some(f) = emb.fulfiller.take() {
545 let _ = f.send(Err(error.clone()));
546 }
547 }
548 }
549 *self.embargoes.borrow_mut() = ExportTable::new();
550
551 drop(pipelines_to_release);
552 drop(clients_to_release);
553 drop(resolve_ops_to_release);
554 match *self.connection.borrow_mut() {
557 Ok(ref mut c) => {
558 let mut message = c.new_outgoing_message(100); {
560 let builder = message
561 .get_body()
562 .unwrap()
563 .init_as::<message::Builder>()
564 .init_abort();
565 from_error(&error, builder);
566 }
567 let _ = message.send();
568 }
569 Err(_) => unreachable!(),
570 }
571
572 let connection = mem::replace(&mut *self.connection.borrow_mut(), Err(error.clone()));
573
574 let Ok(mut c) = connection else {
575 unreachable!()
576 };
577 let promise = c.shutdown(Err(error)).then(|r| match r {
578 Ok(()) => Promise::ok(()),
579 Err(e) => {
580 if e.kind != ::capnp::ErrorKind::Disconnected {
581 Promise::err(e)
583 } else {
584 Promise::ok(())
585 }
586 }
587 });
588 let Some(fulfiller) = self.disconnect_fulfiller.borrow_mut().take() else {
589 unreachable!()
590 };
591 let _ = fulfiller.send(Promise::from_future(promise.attach(c)));
592 }
593
594 fn eagerly_evaluate<T, F>(&self, task: F) -> Promise<T, Error>
597 where
598 F: Future<Output = Result<T, Error>> + 'static + Unpin,
599 T: 'static,
600 {
601 let (tx, rx) = oneshot::channel::<Result<T, Error>>();
602 let (tx2, rx2) = oneshot::channel::<()>();
603 let f1 = Box::pin(task.map(move |r| {
604 let _ = tx.send(r);
605 })) as Pin<Box<dyn Future<Output = ()> + Unpin>>;
606 let f2 = Box::pin(rx2.map(drop)) as Pin<Box<dyn Future<Output = ()> + Unpin>>;
607
608 self.add_task(future::select(f1, f2).map(|_| Ok(())));
609 Promise::from_future(rx.map_err(crate::canceled_to_error).map(|r| {
610 drop(tx2);
611 r?
612 }))
613 }
614
615 fn add_task<F>(&self, task: F)
616 where
617 F: Future<Output = Result<(), Error>> + 'static,
618 {
619 if let Some(ref mut tasks) = *self.tasks.borrow_mut() {
620 tasks.add(task);
621 }
622 }
623
624 pub fn bootstrap(state: &Rc<Self>) -> Box<dyn ClientHook> {
625 let question_id = state.questions.borrow_mut().push(Question::new());
626
627 let (fulfiller, promise) = oneshot::channel();
628 let promise = promise.map_err(crate::canceled_to_error);
629 let promise = promise.and_then(|response_promise| response_promise);
630 let question_ref = Rc::new(RefCell::new(QuestionRef::new(
631 state.clone(),
632 question_id,
633 fulfiller,
634 )));
635 let promise = promise.attach(question_ref.clone());
636 match state.questions.borrow_mut().slots[question_id as usize] {
637 Some(ref mut q) => {
638 q.self_ref = Some(Rc::downgrade(&question_ref));
639 }
640 None => unreachable!(),
641 }
642 match *state.connection.borrow_mut() {
643 Ok(ref mut c) => {
644 let mut message = c.new_outgoing_message(5);
645 {
646 let mut builder = message
647 .get_body()
648 .unwrap()
649 .init_as::<message::Builder>()
650 .init_bootstrap();
651 builder.set_question_id(question_id);
652 }
653 let _ = message.send();
654 }
655 Err(_) => panic!(),
656 }
657
658 let pipeline = Pipeline::new(state, question_ref, Some(Promise::from_future(promise)));
659 pipeline.get_pipelined_cap_move(Vec::new())
660 }
661
662 fn message_loop(weak_state: Weak<Self>) -> Promise<(), capnp::Error> {
663 let Some(state) = weak_state.upgrade() else {
664 return Promise::err(Error::disconnected(
665 "message loop cannot continue without a connection".into(),
666 ));
667 };
668
669 let promise = match *state.connection.borrow_mut() {
670 Err(_) => return Promise::ok(()),
671 Ok(ref mut connection) => connection.receive_incoming_message(),
672 };
673
674 Promise::from_future(async move {
675 match promise.await? {
676 Some(m) => {
677 Self::handle_message(&weak_state, m)?;
678 weak_state
679 .upgrade()
680 .expect("message loop outlived connection state?")
681 .add_task(Self::message_loop(weak_state));
682 }
683 None => {
684 weak_state
685 .upgrade()
686 .expect("message loop outlived connection state?")
687 .disconnect(Error::disconnected("Peer disconnected.".to_string()));
688 }
689 }
690 Ok(())
691 })
692 }
693
694 fn send_unimplemented(
695 connection_state: &Rc<Self>,
696 message: &dyn crate::IncomingMessage,
697 ) -> capnp::Result<()> {
698 let mut out_message = connection_state.new_outgoing_message(50)?; {
700 let mut root: message::Builder = out_message.get_body()?.get_as()?;
701 root.set_unimplemented(message.get_body()?.get_as()?)?;
702 }
703 let _ = out_message.send();
704 Ok(())
705 }
706
707 fn handle_unimplemented(
708 connection_state: &Rc<Self>,
709 message: message::Reader,
710 ) -> capnp::Result<()> {
711 match message.which()? {
712 message::Resolve(resolve) => {
713 let resolve = resolve?;
714 match resolve.which()? {
715 resolve::Cap(c) => match c?.which()? {
716 cap_descriptor::None(()) => (),
717 cap_descriptor::SenderHosted(export_id) => {
718 connection_state.release_export(export_id, 1)?;
719 }
720 cap_descriptor::SenderPromise(export_id) => {
721 connection_state.release_export(export_id, 1)?;
722 }
723 cap_descriptor::ReceiverAnswer(_) | cap_descriptor::ReceiverHosted(_) => (),
724 cap_descriptor::ThirdPartyHosted(_) => {
725 return Err(Error::failed(
726 "Peer claims we resolved a ThirdPartyHosted cap.".to_string(),
727 ));
728 }
729 },
730 resolve::Exception(_) => (),
731 }
732 }
733 _ => {
734 return Err(Error::failed(
735 "Peer did not implement required RPC message type.".to_string(),
736 ));
737 }
738 }
739 Ok(())
740 }
741
742 fn handle_bootstrap(
743 connection_state: &Rc<Self>,
744 bootstrap: bootstrap::Reader,
745 ) -> capnp::Result<()> {
746 use ::capnp::traits::ImbueMut;
747
748 let answer_id = bootstrap.get_question_id();
749 if connection_state.connection.borrow().is_err() {
750 return Ok(());
752 }
753
754 let mut response = connection_state.new_outgoing_message(10)?;
755
756 let result_exports = {
757 let mut ret = response
758 .get_body()?
759 .init_as::<message::Builder>()
760 .init_return();
761 ret.set_answer_id(answer_id);
762
763 let cap = connection_state.bootstrap_cap.clone();
764 let mut cap_table = Vec::new();
765 let mut payload = ret.init_results();
766 {
767 let mut content = payload.reborrow().get_content();
768 content.imbue_mut(&mut cap_table);
769 content.set_as_capability(cap);
770 }
771 assert_eq!(cap_table.len(), 1);
772
773 Self::write_descriptors(connection_state, &cap_table, payload)
774 };
775
776 let slots = &mut connection_state.answers.borrow_mut().slots;
777 let hash_map::Entry::Vacant(slot) = slots.entry(answer_id) else {
778 connection_state.release_exports(&result_exports)?;
779 return Err(Error::failed("questionId is already in use".to_string()));
780 };
781 let mut answer = Answer::new();
782 answer.return_has_been_sent = true;
783 answer.result_exports = result_exports;
784 answer.pipeline = Some(Box::new(SingleCapPipeline::new(
785 connection_state.bootstrap_cap.clone(),
786 )));
787 slot.insert(answer);
788
789 let _ = response.send();
790 Ok(())
791 }
792
793 fn handle_finish(connection_state: &Rc<Self>, finish: finish::Reader) -> capnp::Result<()> {
794 let mut exports_to_release = Vec::new();
795 let answer_id = finish.get_question_id();
796
797 let answers_slots = &mut connection_state.answers.borrow_mut().slots;
798 match answers_slots.entry(answer_id) {
799 hash_map::Entry::Vacant(_) => {
800 }
805 hash_map::Entry::Occupied(mut entry) => {
806 let answer = entry.get_mut();
807 answer.received_finish.set(true);
808
809 if finish.get_release_result_caps() {
810 exports_to_release = ::std::mem::take(&mut answer.result_exports);
811 }
812
813 answer.pipeline.take();
815 answer.call_completion_promise.take();
816
817 if answer.return_has_been_sent {
818 entry.remove();
819 }
820 }
821 }
822
823 connection_state.release_exports(&exports_to_release)?;
824 Ok(())
825 }
826
827 fn handle_resolve(connection_state: &Rc<Self>, resolve: resolve::Reader) -> capnp::Result<()> {
828 let replacement_or_error = match resolve.which()? {
829 resolve::Cap(c) => match Self::receive_cap(connection_state, c?)? {
830 Some(cap) => Ok(cap),
831 None => {
832 return Err(Error::failed(
833 "'Resolve' contained 'CapDescriptor.none'.".to_string(),
834 ));
835 }
836 },
837 resolve::Exception(e) => {
838 Err(remote_exception_to_error(e?))
843 }
844 };
845
846 let slots = &mut connection_state.imports.borrow_mut().slots;
848 if let Some(import) = slots.get_mut(&resolve.get_promise_id()) {
849 match import.promise_client_to_resolve.take() {
850 Some(weak_promise_client) => {
851 if let Some(promise_client) = weak_promise_client.upgrade() {
852 promise_client.borrow_mut().resolve(replacement_or_error);
853 }
854 }
855 None => {
856 return Err(Error::failed(
857 "Got 'Resolve' for a non-promise import.".to_string(),
858 ));
859 }
860 }
861 }
862 Ok(())
863 }
864
865 fn handle_disembargo(
866 connection_state: &Rc<Self>,
867 disembargo: disembargo::Reader,
868 ) -> capnp::Result<()> {
869 let context = disembargo.get_context();
870 match context.which()? {
871 disembargo::context::SenderLoopback(embargo_id) => {
872 let mut target = connection_state.get_message_target(disembargo.get_target()?)?;
873 while let Some(resolved) = target.get_resolved() {
874 target = resolved;
875 }
876
877 if target.get_brand() != connection_state.get_brand() {
878 return Err(Error::failed(
879 "'Disembargo' of type 'senderLoopback' sent to an object that does not point \
880 back to the sender.".to_string()));
881 }
882
883 let connection_state_ref = connection_state.clone();
884 let connection_state_ref1 = connection_state.clone();
885 let task = async move {
886 if let Ok(ref mut c) = *connection_state_ref.connection.borrow_mut() {
887 let mut message = c.new_outgoing_message(100); {
889 let root: message::Builder = message.get_body()?.init_as();
890 let mut disembargo = root.init_disembargo();
891 disembargo
892 .reborrow()
893 .init_context()
894 .set_receiver_loopback(embargo_id);
895
896 let redirect =
897 match Client::from_ptr(target.get_ptr(), &connection_state_ref1) {
898 Some(c) => c.write_target(disembargo.init_target()),
899 None => unreachable!(),
900 };
901 if redirect.is_some() {
902 return Err(Error::failed(
903 "'Disembargo' of type 'senderLoopback' sent to an object that \
904 does not appear to have been the subject of a previous \
905 'Resolve' message."
906 .to_string(),
907 ));
908 }
909 }
910 let _ = message.send();
911 }
912 Ok(())
913 };
914 connection_state.add_task(task);
915 }
916 disembargo::context::ReceiverLoopback(embargo_id) => {
917 if let Some(embargo) = connection_state.embargoes.borrow_mut().find(embargo_id) {
918 let fulfiller = embargo.fulfiller.take().unwrap();
919 let _ = fulfiller.send(Ok(()));
920 } else {
921 return Err(Error::failed(
922 "Invalid embargo ID in `Disembargo.context.receiverLoopback".to_string(),
923 ));
924 }
925 connection_state.embargoes.borrow_mut().erase(embargo_id);
926 }
927 disembargo::context::Accept(_) | disembargo::context::Provide(_) => {
928 return Err(Error::unimplemented(
929 "Disembargo::Context::Provide/Accept not implemented".to_string(),
930 ));
931 }
932 }
933 Ok(())
934 }
935
936 fn handle_message(
937 weak_state: &Weak<Self>,
938 message: Box<dyn crate::IncomingMessage>,
939 ) -> ::capnp::Result<()> {
940 let Some(connection_state) = weak_state.upgrade() else {
941 return Err(Error::disconnected(
942 "handle_message() cannot continue without a connection".into(),
943 ));
944 };
945
946 let reader = message.get_body()?.get_as::<message::Reader>()?;
947 match reader.which() {
948 Ok(message::Unimplemented(message)) => {
949 Self::handle_unimplemented(&connection_state, message?)?
950 }
951 Ok(message::Abort(abort)) => return Err(remote_exception_to_error(abort?)),
952 Ok(message::Bootstrap(bootstrap)) => {
953 Self::handle_bootstrap(&connection_state, bootstrap?)?
954 }
955 Ok(message::Call(call)) => {
956 let call = call?;
957 let capability = connection_state.get_message_target(call.get_target()?)?;
958 let (interface_id, method_id, question_id, cap_table_array, redirect_results) = {
959 let redirect_results = match call.get_send_results_to().which()? {
960 call::send_results_to::Caller(()) => false,
961 call::send_results_to::Yourself(()) => true,
962 call::send_results_to::ThirdParty(_) => {
963 return Err(Error::failed(
964 "Unsupported `Call.sendResultsTo`.".to_string(),
965 ))
966 }
967 };
968 let payload = call.get_params()?;
969
970 (
971 call.get_interface_id(),
972 call.get_method_id(),
973 call.get_question_id(),
974 Self::receive_caps(&connection_state, payload.get_cap_table()?)?,
975 redirect_results,
976 )
977 };
978
979 if connection_state
980 .answers
981 .borrow()
982 .slots
983 .contains_key(&question_id)
984 {
985 return Err(Error::failed(format!(
986 "Received a new call on in-use question id {question_id}"
987 )));
988 }
989
990 let params = Params::new(message, cap_table_array);
991
992 let answer = Answer::new();
993
994 let (results_inner_fulfiller, results_inner_promise) = oneshot::channel();
995 let results_inner_promise = results_inner_promise.map_err(crate::canceled_to_error);
996
997 let (pipeline_sender, mut pipeline) = queued::Pipeline::new();
998 let results = Results::new(
999 &connection_state,
1000 question_id,
1001 redirect_results,
1002 results_inner_fulfiller,
1003 answer.received_finish.clone(),
1004 Some(pipeline_sender.weak_clone()),
1005 );
1006
1007 let (redirected_results_done_promise, redirected_results_done_fulfiller) =
1008 if redirect_results {
1009 let (f, p) = oneshot::channel::<Result<Response<VatId>, Error>>();
1010 let p = p.map_err(crate::canceled_to_error).and_then(future::ready);
1011 (Some(Promise::from_future(p)), Some(f))
1012 } else {
1013 (None, None)
1014 };
1015
1016 {
1017 let slots = &mut connection_state.answers.borrow_mut().slots;
1018 let hash_map::Entry::Vacant(slot) = slots.entry(question_id) else {
1019 return Err(Error::failed("questionId is already in use".to_string()));
1020 };
1021 slot.insert(answer);
1022 }
1023
1024 let call_promise =
1025 capability.call(interface_id, method_id, Box::new(params), Box::new(results));
1026
1027 let promise = call_promise
1028 .then(move |call_result| {
1029 results_inner_promise.then(move |result| {
1030 future::ready(ResultsDone::from_results_inner(
1031 result,
1032 call_result,
1033 pipeline_sender,
1034 ))
1035 })
1036 })
1037 .then(move |v| {
1038 if let Some(f) = redirected_results_done_fulfiller {
1039 match v {
1040 Ok(r) => drop(f.send(Ok(Response::redirected(r.clone())))),
1041 Err(e) => drop(f.send(Err(e))),
1042 }
1043 }
1044 Promise::ok(())
1045 });
1046
1047 let fork = promise.shared();
1048 pipeline.drive(fork.clone());
1049
1050 {
1051 let slots = &mut connection_state.answers.borrow_mut().slots;
1052 let Some(answer) = slots.get_mut(&question_id) else {
1053 unreachable!()
1054 };
1055 answer.pipeline = Some(Box::new(pipeline));
1056 if redirect_results {
1057 answer.redirected_results = redirected_results_done_promise;
1058 } else {
1060 answer.call_completion_promise =
1061 Some(connection_state.eagerly_evaluate(fork));
1062 }
1063 }
1064 }
1065 Ok(message::Return(oret)) => {
1066 let ret = oret?;
1067 let question_id = ret.get_answer_id();
1068
1069 let mut questions = connection_state.questions.borrow_mut();
1070 match questions.slots[question_id as usize] {
1071 Some(ref mut question) => {
1072 question.is_awaiting_return = false;
1073 if ret.get_no_finish_needed() {
1074 question.skip_finish = true;
1075 }
1076 match question.self_ref {
1077 Some(ref question_ref) => match ret.which()? {
1078 return_::Results(results) => {
1079 let cap_table = Self::receive_caps(
1080 &connection_state,
1081 results?.get_cap_table()?,
1082 )?;
1083
1084 let question_ref =
1085 question_ref.upgrade().expect("dangling question ref?");
1086 let response = Response::new(
1087 connection_state.clone(),
1088 question_ref.clone(),
1089 message,
1090 cap_table,
1091 );
1092 question_ref.borrow_mut().fulfill(Promise::ok(response));
1093 }
1094 return_::Exception(e) => {
1095 let tmp =
1096 question_ref.upgrade().expect("dangling question ref?");
1097 tmp.borrow_mut().reject(remote_exception_to_error(e?));
1098 }
1099 return_::Canceled(_) => {
1100 Self::send_unimplemented(&connection_state, message.as_ref())?;
1101 }
1102 return_::ResultsSentElsewhere(_) => {
1103 Self::send_unimplemented(&connection_state, message.as_ref())?;
1104 }
1105 return_::TakeFromOtherQuestion(id) => {
1106 if let Some(answer) =
1107 connection_state.answers.borrow_mut().slots.get_mut(&id)
1108 {
1109 if let Some(res) = answer.redirected_results.take() {
1110 let tmp = question_ref
1111 .upgrade()
1112 .expect("dangling question ref?");
1113 tmp.borrow_mut().fulfill(res);
1114 } else {
1115 return Err(Error::failed("return.takeFromOtherQuestion referenced a call that \
1116 did not use sendResultsTo.yourself.".to_string()));
1117 }
1118 } else {
1119 return Err(Error::failed(
1120 "return.takeFromOtherQuestion had invalid answer ID."
1121 .to_string(),
1122 ));
1123 }
1124 }
1125 return_::AcceptFromThirdParty(_) => {
1126 drop(questions);
1127 Self::send_unimplemented(&connection_state, message.as_ref())?;
1128 }
1129 },
1130 None => {
1131 if let return_::TakeFromOtherQuestion(_) = ret.which()? {
1132 return Self::send_unimplemented(
1133 &connection_state,
1134 message.as_ref(),
1135 );
1136 }
1137 questions.erase(question_id);
1142 }
1143 }
1144 }
1145 None => {
1146 return Err(Error::failed(format!(
1147 "Invalid question ID in Return message: {question_id}"
1148 )));
1149 }
1150 }
1151 }
1152 Ok(message::Finish(finish)) => Self::handle_finish(&connection_state, finish?)?,
1153 Ok(message::Resolve(resolve)) => Self::handle_resolve(&connection_state, resolve?)?,
1154 Ok(message::Release(release)) => {
1155 let release = release?;
1156 connection_state.release_export(release.get_id(), release.get_reference_count())?;
1157 }
1158 Ok(message::Disembargo(disembargo)) => {
1159 Self::handle_disembargo(&connection_state, disembargo?)?
1160 }
1161 Ok(
1162 message::Provide(_)
1163 | message::Accept(_)
1164 | message::Join(_)
1165 | message::ObsoleteSave(_)
1166 | message::ObsoleteDelete(_),
1167 )
1168 | Err(::capnp::NotInSchema(_)) => {
1169 Self::send_unimplemented(&connection_state, message.as_ref())?;
1170 }
1171 }
1172 Ok(())
1173 }
1174
1175 fn answer_has_sent_return(&self, id: AnswerId, result_exports: Vec<ExportId>) {
1176 let answers_slots = &mut self.answers.borrow_mut().slots;
1177 let hash_map::Entry::Occupied(mut entry) = answers_slots.entry(id) else {
1178 unreachable!()
1179 };
1180 let a = entry.get_mut();
1181 a.return_has_been_sent = true;
1182 if a.received_finish.get() {
1183 entry.remove();
1184 } else {
1185 a.result_exports = result_exports;
1186 }
1187 }
1188
1189 fn release_export(&self, id: ExportId, refcount: u32) -> ::capnp::Result<()> {
1190 let mut exports = self.exports.borrow_mut();
1191 let Some(e) = exports.find(id) else {
1192 return Err(Error::failed(
1193 "Tried to release invalid export ID.".to_string(),
1194 ));
1195 };
1196 if refcount > e.refcount {
1197 return Err(Error::failed(
1198 "Tried to drop export's refcount below zero.".to_string(),
1199 ));
1200 }
1201 e.refcount -= refcount;
1202 if e.refcount == 0 {
1203 let client_ptr = e.client_hook.get_ptr();
1204 if e.canonical {
1205 self.exports_by_cap.borrow_mut().remove(&client_ptr);
1206 }
1207 exports.erase(id);
1208 }
1209 Ok(())
1210 }
1211
1212 fn release_exports(&self, exports: &[ExportId]) -> ::capnp::Result<()> {
1213 for &export_id in exports {
1214 self.release_export(export_id, 1)?;
1215 }
1216 Ok(())
1217 }
1218
1219 fn get_brand(&self) -> usize {
1220 self as *const _ as usize
1221 }
1222
1223 fn get_message_target(
1224 &self,
1225 target: message_target::Reader,
1226 ) -> ::capnp::Result<Box<dyn ClientHook>> {
1227 match target.which()? {
1228 message_target::ImportedCap(export_id) => {
1229 match self.exports.borrow().slots.get(export_id as usize) {
1230 Some(Some(exp)) => Ok(exp.client_hook.clone()),
1231 _ => Err(Error::failed(
1232 "Message target is not a current export ID.".to_string(),
1233 )),
1234 }
1235 }
1236 message_target::PromisedAnswer(promised_answer) => {
1237 let promised_answer = promised_answer?;
1238 let question_id = promised_answer.get_question_id();
1239
1240 let pipeline = match self.answers.borrow().slots.get(&question_id) {
1241 None => Box::new(broken::Pipeline::new(Error::failed(
1242 "Pipeline call on a request that returned no capabilities or was already closed.".to_string(),
1243 ))) as Box<dyn PipelineHook>,
1244 Some(base) => {
1245 match base.pipeline {
1246 Some(ref pipeline) => pipeline.add_ref(),
1247 None => Box::new(broken::Pipeline::new(Error::failed(
1248 "Pipeline call on a request that returned not capabilities or was \
1249 already closed."
1250 .to_string(),
1251 ))) as Box<dyn PipelineHook>,
1252 }
1253 }
1254 };
1255 let ops = to_pipeline_ops(promised_answer.get_transform()?)?;
1256 Ok(pipeline.get_pipelined_cap(&ops))
1257 }
1258 }
1259 }
1260
1261 fn write_target(
1271 &self,
1272 cap: &dyn ClientHook,
1273 target: message_target::Builder,
1274 ) -> Option<Box<dyn ClientHook>> {
1275 if cap.get_brand() == self.get_brand() {
1276 match Client::from_ptr(cap.get_ptr(), self) {
1277 Some(c) => c.write_target(target),
1278 None => unreachable!(),
1279 }
1280 } else {
1281 Some(cap.add_ref())
1282 }
1283 }
1284
1285 fn get_innermost_client(&self, mut client: Box<dyn ClientHook>) -> Box<dyn ClientHook> {
1289 while let Some(inner) = client.get_resolved() {
1290 client = inner;
1291 }
1292 if client.get_brand() == self.get_brand() {
1293 match self.client_downcast_map.borrow().get(&client.get_ptr()) {
1294 Some(c) => Box::new(c.upgrade().expect("dangling client?")),
1295 None => unreachable!(),
1296 }
1297 } else {
1298 client
1299 }
1300 }
1301
1302 #[allow(clippy::await_holding_refcell_ref)] fn resolve_exported_promise(
1307 state: &Rc<Self>,
1308 export_id: ExportId,
1309 promise: Promise<Box<dyn ClientHook>, Error>,
1310 ) -> Promise<(), Error> {
1311 let weak_connection_state = Rc::downgrade(state);
1312 state.eagerly_evaluate(Promise::from_future(async move {
1313 let resolution_result = promise.await;
1314 let connection_state = weak_connection_state
1315 .upgrade()
1316 .expect("dangling connection state?");
1317
1318 match resolution_result {
1319 Ok(resolution) => {
1320 let resolution = connection_state.get_innermost_client(resolution.clone());
1321
1322 let brand = resolution.get_brand();
1323
1324 let mut exports = connection_state.exports.borrow_mut();
1328 let Some(exp) = exports.find(export_id) else {
1329 return Err(Error::failed("export table entry not found".to_string()));
1330 };
1331
1332 if exp.canonical {
1333 connection_state
1334 .exports_by_cap
1335 .borrow_mut()
1336 .remove(&exp.client_hook.get_ptr());
1337 }
1338 exp.client_hook = resolution.clone();
1339
1340 exp.canonical = false;
1347
1348 if brand != connection_state.get_brand() {
1349 if let Some(promise) = resolution.when_more_resolved() {
1353 let mut exports_by_cap = connection_state.exports_by_cap.borrow_mut();
1359
1360 let replacement_export_id =
1361 match exports_by_cap.entry(exp.client_hook.get_ptr()) {
1362 hash_map::Entry::Occupied(occ) => *occ.get(),
1363 hash_map::Entry::Vacant(vac) => {
1364 vac.insert(export_id);
1367 export_id
1368 }
1369 };
1370 if replacement_export_id == export_id {
1371 exp.canonical = true;
1376 drop(exports);
1377 drop(exports_by_cap);
1378 return Self::resolve_exported_promise(
1379 &connection_state,
1380 export_id,
1381 promise,
1382 )
1383 .await;
1384 }
1385 }
1386 }
1387 drop(exports);
1389
1390 let mut message = connection_state.new_outgoing_message(15)?;
1392 {
1393 let root: message::Builder = message.get_body()?.get_as()?;
1394 let mut resolve = root.init_resolve();
1395 resolve.set_promise_id(export_id);
1396 let _export = Self::write_descriptor(
1397 &connection_state,
1398 resolution,
1399 resolve.init_cap(),
1400 )?;
1401 }
1402 let _ = message.send();
1403 Ok(())
1404 }
1405 Err(e) => {
1406 let mut message = connection_state.new_outgoing_message(15)?;
1408 {
1409 let root: message::Builder = message.get_body()?.get_as()?;
1410 let mut resolve = root.init_resolve();
1411 resolve.set_promise_id(export_id);
1412 from_error(&e, resolve.init_exception());
1413 }
1414 let _ = message.send();
1415 Ok(())
1416 }
1417 }
1418 }))
1419 }
1420
1421 fn write_descriptor(
1422 state: &Rc<Self>,
1423 mut inner: Box<dyn ClientHook>,
1424 mut descriptor: cap_descriptor::Builder,
1425 ) -> ::capnp::Result<Option<ExportId>> {
1426 while let Some(resolved) = inner.get_resolved() {
1428 inner = resolved;
1429 }
1430 if inner.get_brand() == state.get_brand() {
1431 let Some(c) = Client::from_ptr(inner.get_ptr(), state) else {
1432 unreachable!()
1433 };
1434 Ok(c.write_descriptor(descriptor))
1435 } else {
1436 let ptr = inner.get_ptr();
1437 let contains_key = state.exports_by_cap.borrow().contains_key(&ptr);
1438 if contains_key {
1439 let export_id = state.exports_by_cap.borrow()[&ptr];
1441 descriptor.set_sender_hosted(export_id);
1442 state.exports.borrow_mut().find(export_id).unwrap().refcount += 1;
1444 Ok(Some(export_id))
1445 } else {
1446 let mut exp = Export::new(inner.clone());
1449 exp.canonical = true;
1450 let export_id = state.exports.borrow_mut().push(exp);
1451 state.exports_by_cap.borrow_mut().insert(ptr, export_id);
1452 match inner.when_more_resolved() {
1453 Some(wrapped) => {
1454 if let Some(exp) = state.exports.borrow_mut().find(export_id) {
1456 exp.resolve_op =
1457 Self::resolve_exported_promise(state, export_id, wrapped);
1458 }
1459 descriptor.set_sender_promise(export_id);
1460 }
1461 None => {
1462 descriptor.set_sender_hosted(export_id);
1463 }
1464 }
1465 Ok(Some(export_id))
1466 }
1467 }
1468 }
1469
1470 fn write_descriptors(
1471 state: &Rc<Self>,
1472 cap_table: &[Option<Box<dyn ClientHook>>],
1473 payload: payload::Builder,
1474 ) -> Vec<ExportId> {
1475 let mut cap_table_builder = payload.init_cap_table(cap_table.len() as u32);
1476 let mut exports = Vec::new();
1477 for (idx, value) in cap_table.iter().enumerate() {
1478 match value {
1479 Some(cap) => {
1480 if let Some(export_id) = Self::write_descriptor(
1481 state,
1482 cap.clone(),
1483 cap_table_builder.reborrow().get(idx as u32),
1484 )
1485 .unwrap()
1486 {
1487 exports.push(export_id);
1488 }
1489 }
1490 None => {
1491 cap_table_builder.reborrow().get(idx as u32).set_none(());
1492 }
1493 }
1494 }
1495 exports
1496 }
1497
1498 fn import(state: &Rc<Self>, import_id: ImportId, is_promise: bool) -> Box<dyn ClientHook> {
1499 let import_client = {
1500 match state.imports.borrow_mut().slots.entry(import_id) {
1501 hash_map::Entry::Occupied(occ) => occ
1502 .get()
1503 .import_client
1504 .upgrade()
1505 .expect("dangling ref to import client?"),
1506 hash_map::Entry::Vacant(v) => {
1507 let import_client = ImportClient::new(state, import_id);
1508 v.insert(Import::new(&import_client));
1509 import_client
1510 }
1511 }
1512 };
1513
1514 import_client.borrow_mut().add_remote_ref();
1516
1517 let mut tmp = state.imports.borrow_mut();
1518 let Some(import) = tmp.slots.get_mut(&import_id) else {
1519 unreachable!()
1520 };
1521
1522 if is_promise {
1523 match &import.app_client {
1525 Some(c) => {
1526 Box::new(c.upgrade().expect("dangling client ref?"))
1528 }
1529 None => {
1530 let client: Box<Client<VatId>> = Box::new(import_client.into());
1533 let client: Box<dyn ClientHook> = client;
1534
1535 let client = PromiseClient::new(state, client, Some(import_id));
1545
1546 import.promise_client_to_resolve = Some(Rc::downgrade(&client));
1547 let client: Box<Client<VatId>> = Box::new(client.into());
1548 import.app_client = Some(client.downgrade());
1549 client
1550 }
1551 }
1552 } else {
1553 let client: Box<Client<VatId>> = Box::new(import_client.into());
1554 import.app_client = Some(client.downgrade());
1555 client
1556 }
1557 }
1558
1559 fn receive_cap(
1560 state: &Rc<Self>,
1561 descriptor: cap_descriptor::Reader,
1562 ) -> ::capnp::Result<Option<Box<dyn ClientHook>>> {
1563 match descriptor.which()? {
1564 cap_descriptor::None(()) => Ok(None),
1565 cap_descriptor::SenderHosted(sender_hosted) => {
1566 Ok(Some(Self::import(state, sender_hosted, false)))
1567 }
1568 cap_descriptor::SenderPromise(sender_promise) => {
1569 Ok(Some(Self::import(state, sender_promise, true)))
1570 }
1571 cap_descriptor::ReceiverHosted(receiver_hosted) => {
1572 if let Some(exp) = state.exports.borrow_mut().find(receiver_hosted) {
1573 Ok(Some(exp.client_hook.add_ref()))
1574 } else {
1575 Ok(Some(broken::new_cap(Error::failed(
1576 "invalid 'receiverHosted' export ID".to_string(),
1577 ))))
1578 }
1579 }
1580 cap_descriptor::ReceiverAnswer(receiver_answer) => {
1581 let promised_answer = receiver_answer?;
1582 let question_id = promised_answer.get_question_id();
1583 if let Some(answer) = state.answers.borrow().slots.get(&question_id) {
1584 if let Some(ref pipeline) = answer.pipeline {
1585 let ops = to_pipeline_ops(promised_answer.get_transform()?)?;
1586 return Ok(Some(pipeline.get_pipelined_cap(&ops)));
1587 }
1588 }
1589 Ok(Some(broken::new_cap(Error::failed(
1590 "invalid 'receiver answer'".to_string(),
1591 ))))
1592 }
1593 cap_descriptor::ThirdPartyHosted(_third_party_hosted) => Err(Error::unimplemented(
1594 "ThirdPartyHosted caps are not supported.".to_string(),
1595 )),
1596 }
1597 }
1598
1599 fn receive_caps(
1600 state: &Rc<Self>,
1601 cap_table: ::capnp::struct_list::Reader<cap_descriptor::Owned>,
1602 ) -> ::capnp::Result<Vec<Option<Box<dyn ClientHook>>>> {
1603 let mut result = Vec::new();
1604 for idx in 0..cap_table.len() {
1605 result.push(Self::receive_cap(state, cap_table.get(idx))?);
1606 }
1607 Ok(result)
1608 }
1609}
1610
1611enum DisconnectorState {
1612 New,
1613 Disconnecting,
1614 Disconnected,
1615}
1616
1617pub struct Disconnector<VatId>
1619where
1620 VatId: 'static,
1621{
1622 connection_state: Rc<RefCell<Option<Rc<ConnectionState<VatId>>>>>,
1623 state: DisconnectorState,
1624}
1625
1626impl<VatId> Disconnector<VatId> {
1627 pub fn new(connection_state: Rc<RefCell<Option<Rc<ConnectionState<VatId>>>>>) -> Self {
1628 Self {
1629 connection_state,
1630 state: DisconnectorState::New,
1631 }
1632 }
1633 fn disconnect(&self) {
1634 if let Some(ref state) = *(self.connection_state.borrow()) {
1635 state.disconnect(::capnp::Error::disconnected(
1636 "client requested disconnect".to_owned(),
1637 ));
1638 }
1639 }
1640}
1641
1642impl<VatId> Future for Disconnector<VatId>
1643where
1644 VatId: 'static,
1645{
1646 type Output = Result<(), capnp::Error>;
1647
1648 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
1649 self.state = match self.state {
1650 DisconnectorState::New => {
1651 self.disconnect();
1652 DisconnectorState::Disconnecting
1653 }
1654 DisconnectorState::Disconnecting => {
1655 if self.connection_state.borrow().is_some() {
1656 DisconnectorState::Disconnecting
1657 } else {
1658 DisconnectorState::Disconnected
1659 }
1660 }
1661 DisconnectorState::Disconnected => DisconnectorState::Disconnected,
1662 };
1663 match self.state {
1664 DisconnectorState::New => unreachable!(),
1665 DisconnectorState::Disconnecting => {
1666 cx.waker().wake_by_ref();
1667 Poll::Pending
1668 }
1669 DisconnectorState::Disconnected => Poll::Ready(Ok(())),
1670 }
1671 }
1672}
1673
1674struct ResponseState<VatId>
1675where
1676 VatId: 'static,
1677{
1678 _connection_state: Rc<ConnectionState<VatId>>,
1679 message: Box<dyn crate::IncomingMessage>,
1680 cap_table: Vec<Option<Box<dyn ClientHook>>>,
1681 _question_ref: Rc<RefCell<QuestionRef<VatId>>>,
1682}
1683
1684enum ResponseVariant<VatId>
1685where
1686 VatId: 'static,
1687{
1688 Rpc(ResponseState<VatId>),
1689 LocallyRedirected(Box<dyn ResultsDoneHook>),
1690}
1691
1692struct Response<VatId>
1693where
1694 VatId: 'static,
1695{
1696 variant: Rc<ResponseVariant<VatId>>,
1697}
1698
1699impl<VatId> Response<VatId> {
1700 fn new(
1701 connection_state: Rc<ConnectionState<VatId>>,
1702 question_ref: Rc<RefCell<QuestionRef<VatId>>>,
1703 message: Box<dyn crate::IncomingMessage>,
1704 cap_table_array: Vec<Option<Box<dyn ClientHook>>>,
1705 ) -> Self {
1706 Self {
1707 variant: Rc::new(ResponseVariant::Rpc(ResponseState {
1708 _connection_state: connection_state,
1709 message,
1710 cap_table: cap_table_array,
1711 _question_ref: question_ref,
1712 })),
1713 }
1714 }
1715 fn redirected(results_done: Box<dyn ResultsDoneHook>) -> Self {
1716 Self {
1717 variant: Rc::new(ResponseVariant::LocallyRedirected(results_done)),
1718 }
1719 }
1720}
1721
1722impl<VatId> Clone for Response<VatId> {
1723 fn clone(&self) -> Self {
1724 Self {
1725 variant: self.variant.clone(),
1726 }
1727 }
1728}
1729
1730impl<VatId> ResponseHook for Response<VatId> {
1731 fn get(&self) -> ::capnp::Result<any_pointer::Reader<'_>> {
1732 match *self.variant {
1733 ResponseVariant::Rpc(ref state) => {
1734 match state
1735 .message
1736 .get_body()?
1737 .get_as::<message::Reader>()?
1738 .which()?
1739 {
1740 message::Return(Ok(ret)) => match ret.which()? {
1741 return_::Results(Ok(mut payload)) => {
1742 use ::capnp::traits::Imbue;
1743 payload.imbue(&state.cap_table);
1744 Ok(payload.get_content())
1745 }
1746 _ => unreachable!(),
1747 },
1748 _ => unreachable!(),
1749 }
1750 }
1751 ResponseVariant::LocallyRedirected(ref results_done) => results_done.get(),
1752 }
1753 }
1754}
1755
1756struct Request<VatId>
1757where
1758 VatId: 'static,
1759{
1760 connection_state: Rc<ConnectionState<VatId>>,
1761 target: Client<VatId>,
1762 message: Box<dyn crate::OutgoingMessage>,
1763 cap_table: Vec<Option<Box<dyn ClientHook>>>,
1764}
1765
1766fn get_call(message: &mut Box<dyn crate::OutgoingMessage>) -> ::capnp::Result<call::Builder<'_>> {
1767 let message_root: message::Builder = message.get_body()?.get_as()?;
1768 match message_root.which()? {
1769 message::Call(call) => call,
1770 _ => {
1771 unimplemented!()
1772 }
1773 }
1774}
1775
1776impl<VatId> Request<VatId>
1777where
1778 VatId: 'static,
1779{
1780 fn new(
1781 connection_state: Rc<ConnectionState<VatId>>,
1782 _size_hint: Option<::capnp::MessageSize>,
1783 target: Client<VatId>,
1784 ) -> ::capnp::Result<Self> {
1785 let message = connection_state.new_outgoing_message(1024)?;
1786 Ok(Self {
1787 connection_state,
1788 target,
1789 message,
1790 cap_table: Vec::new(),
1791 })
1792 }
1793
1794 fn init_call(&mut self) -> call::Builder<'_> {
1795 let message_root: message::Builder = self.message.get_body().unwrap().get_as().unwrap();
1796 message_root.init_call()
1797 }
1798
1799 fn send_internal(
1800 connection_state: &Rc<ConnectionState<VatId>>,
1801 mut message: Box<dyn crate::OutgoingMessage>,
1802 cap_table: &[Option<Box<dyn ClientHook>>],
1803 is_tail_call: bool,
1804 ) -> (
1805 Rc<RefCell<QuestionRef<VatId>>>,
1806 Promise<Response<VatId>, Error>,
1807 ) {
1808 let exports = ConnectionState::write_descriptors(
1810 connection_state,
1811 cap_table,
1812 get_call(&mut message).unwrap().get_params().unwrap(),
1813 );
1814
1815 let mut question = Question::<VatId>::new();
1817 question.is_awaiting_return = true;
1818 question.param_exports = exports;
1819 question.is_tail_call = is_tail_call;
1820
1821 let question_id = connection_state.questions.borrow_mut().push(question);
1822 {
1823 let mut call_builder: call::Builder = get_call(&mut message).unwrap();
1824 call_builder.reborrow().set_question_id(question_id);
1826 if is_tail_call {
1827 call_builder.get_send_results_to().set_yourself(());
1828 }
1829 }
1830 let _ = message.send();
1831 let (fulfiller, promise) = oneshot::channel::<Promise<Response<VatId>, Error>>();
1833 let promise = promise.map_err(crate::canceled_to_error).and_then(|x| x);
1834 let question_ref = Rc::new(RefCell::new(QuestionRef::new(
1835 connection_state.clone(),
1836 question_id,
1837 fulfiller,
1838 )));
1839
1840 match connection_state.questions.borrow_mut().slots[question_id as usize] {
1841 Some(ref mut q) => {
1842 q.self_ref = Some(Rc::downgrade(&question_ref));
1843 }
1844 None => unreachable!(),
1845 }
1846
1847 let promise = promise.attach(question_ref.clone());
1848 let promise2 = Promise::from_future(promise);
1849
1850 (question_ref, promise2)
1851 }
1852
1853 fn send_streaming_internal(
1854 connection_state: &Rc<ConnectionState<VatId>>,
1855 mut message: Box<dyn crate::OutgoingMessage>,
1856 cap_table: &[Option<Box<dyn ClientHook>>],
1857 flow: Rc<RefCell<Option<Box<dyn crate::FlowController>>>>,
1858 ) -> Promise<(), Error> {
1859 let exports = ConnectionState::write_descriptors(
1861 connection_state,
1862 cap_table,
1863 get_call(&mut message).unwrap().get_params().unwrap(),
1864 );
1865
1866 let mut question = Question::<VatId>::new();
1868 question.is_awaiting_return = true;
1869 question.param_exports = exports;
1870 question.is_tail_call = false;
1871
1872 let question_id = connection_state.questions.borrow_mut().push(question);
1873 {
1874 let mut call_builder: call::Builder = get_call(&mut message).unwrap();
1875 call_builder.reborrow().set_question_id(question_id);
1876 }
1877
1878 let (fulfiller, promise) = oneshot::channel::<Promise<Response<VatId>, Error>>();
1880 let promise = promise.map_err(crate::canceled_to_error).and_then(|x| x);
1881 let question_ref = Rc::new(RefCell::new(QuestionRef::new(
1882 connection_state.clone(),
1883 question_id,
1884 fulfiller,
1885 )));
1886
1887 match connection_state.questions.borrow_mut().slots[question_id as usize] {
1888 Some(ref mut q) => {
1889 q.self_ref = Some(Rc::downgrade(&question_ref));
1890 }
1891 None => unreachable!(),
1892 }
1893 let promise = promise.attach(question_ref.clone());
1894
1895 let mut flow = flow.borrow_mut();
1896 if flow.is_none() {
1897 match connection_state.connection.borrow_mut().as_mut() {
1898 Err(_) => return Promise::err(Error::failed("no connection".into())),
1899 Ok(connection) => {
1900 let (s, p) = connection.new_stream();
1901 connection_state.add_task(p);
1902 *flow = Some(s);
1903 }
1904 };
1905 }
1906 let Some(ref mut flow) = *flow else {
1907 unreachable!()
1908 };
1909 flow.send(
1910 message,
1911 Promise::from_future(async move {
1912 let _ = promise.await?;
1913 Ok(())
1914 }),
1915 )
1916 }
1917}
1918
1919impl<VatId> RequestHook for Request<VatId> {
1920 fn get(&mut self) -> any_pointer::Builder<'_> {
1921 use ::capnp::traits::ImbueMut;
1922 let mut builder = get_call(&mut self.message)
1923 .unwrap()
1924 .get_params()
1925 .unwrap()
1926 .get_content();
1927 builder.imbue_mut(&mut self.cap_table);
1928 builder
1929 }
1930 fn get_brand<'a>(&self) -> usize {
1931 self.connection_state.get_brand()
1932 }
1933 fn send(self: Box<Self>) -> ::capnp::capability::RemotePromise<any_pointer::Owned> {
1934 let tmp = *self;
1935 let Self {
1936 connection_state,
1937 target,
1938 mut message,
1939 cap_table,
1940 } = tmp;
1941 let write_target_result = {
1942 let call_builder: call::Builder = get_call(&mut message).unwrap();
1943 target.write_target(call_builder.get_target().unwrap())
1944 };
1945 if let Some(redirect) = write_target_result {
1946 let mut call_builder: call::Builder = get_call(&mut message).unwrap();
1949 let mut replacement = redirect.new_call(
1950 call_builder.reborrow().get_interface_id(),
1951 call_builder.reborrow().get_method_id(),
1952 None,
1953 );
1954
1955 replacement
1956 .set(
1957 call_builder
1958 .get_params()
1959 .unwrap()
1960 .get_content()
1961 .into_reader(),
1962 )
1963 .unwrap();
1964 return replacement.send();
1965 }
1966 let (question_ref, promise) =
1967 Self::send_internal(&connection_state, message, &cap_table, false);
1968 let forked_promise1 = promise.shared();
1969 let forked_promise2 = forked_promise1.clone();
1970
1971 let pipeline = Pipeline::new(
1973 &connection_state,
1974 question_ref,
1975 Some(Promise::from_future(forked_promise1)),
1976 );
1977
1978 let resolved = pipeline.when_resolved();
1979
1980 let forked_promise2 = resolved.map(|_| Ok(())).and_then(|()| forked_promise2);
1981
1982 let app_promise = Promise::from_future(
1983 forked_promise2
1984 .map_ok(|response| ::capnp::capability::Response::new(Box::new(response))),
1985 );
1986
1987 ::capnp::capability::RemotePromise {
1988 promise: app_promise,
1989 pipeline: any_pointer::Pipeline::new(Box::new(pipeline)),
1990 }
1991 }
1992 fn send_streaming(self: Box<Self>) -> Promise<(), Error> {
1993 let tmp = *self;
1994 let Self {
1995 connection_state,
1996 target,
1997 mut message,
1998 cap_table,
1999 } = tmp;
2000 let write_target_result = {
2001 let call_builder: call::Builder = get_call(&mut message).unwrap();
2002 target.write_target(call_builder.get_target().unwrap())
2003 };
2004 if let Some(redirect) = write_target_result {
2005 let mut call_builder: call::Builder = get_call(&mut message).unwrap();
2008 let mut replacement = redirect.new_call(
2009 call_builder.reborrow().get_interface_id(),
2010 call_builder.reborrow().get_method_id(),
2011 None,
2012 );
2013
2014 replacement
2015 .set(
2016 call_builder
2017 .get_params()
2018 .unwrap()
2019 .get_content()
2020 .into_reader(),
2021 )
2022 .unwrap();
2023 return replacement.hook.send_streaming();
2024 }
2025 Self::send_streaming_internal(
2026 &connection_state,
2027 message,
2028 &cap_table,
2029 target.flow_controller,
2030 )
2031 }
2032 fn tail_send(self: Box<Self>) -> Option<(u32, Promise<(), Error>, Box<dyn PipelineHook>)> {
2033 let tmp = *self;
2034 let Self {
2035 connection_state,
2036 target,
2037 mut message,
2038 cap_table,
2039 } = tmp;
2040
2041 if connection_state.connection.borrow().is_err() {
2042 return None;
2044 }
2045
2046 let write_target_result = {
2047 let call_builder: crate::rpc_capnp::call::Builder = get_call(&mut message).unwrap();
2048 target.write_target(call_builder.get_target().unwrap())
2049 };
2050
2051 let (question_ref, promise) = match write_target_result {
2052 Some(_redirect) => {
2053 return None;
2054 }
2055 None => Self::send_internal(&connection_state, message, &cap_table, true),
2056 };
2057
2058 let promise = promise.map_ok(|_response| {
2059 unimplemented!()
2062 });
2063
2064 let question_id = question_ref.borrow().id;
2065 let pipeline = Pipeline::never_done(connection_state, question_ref);
2066
2067 Some((
2068 question_id,
2069 Promise::from_future(promise),
2070 Box::new(pipeline),
2071 ))
2072 }
2073}
2074
2075enum PipelineVariant<VatId>
2076where
2077 VatId: 'static,
2078{
2079 Waiting(Rc<RefCell<QuestionRef<VatId>>>),
2080 Resolved(Response<VatId>),
2081 Broken(Error),
2082}
2083
2084struct PipelineState<VatId>
2085where
2086 VatId: 'static,
2087{
2088 variant: PipelineVariant<VatId>,
2089 redirect_later: Option<RefCell<futures::future::Shared<Promise<Response<VatId>, Error>>>>,
2090 connection_state: Rc<ConnectionState<VatId>>,
2091
2092 #[allow(dead_code)]
2093 resolve_self_promise: Promise<(), Error>,
2094
2095 promise_clients_to_resolve: RefCell<
2096 crate::sender_queue::SenderQueue<
2097 (Weak<RefCell<PromiseClient<VatId>>>, Vec<PipelineOp>),
2098 (),
2099 >,
2100 >,
2101 resolution_waiters: crate::sender_queue::SenderQueue<(), ()>,
2102}
2103
2104impl<VatId> PipelineState<VatId>
2105where
2106 VatId: 'static,
2107{
2108 fn resolve(state: &Rc<RefCell<Self>>, response: Result<Response<VatId>, Error>) {
2109 let to_resolve = {
2110 let tmp = state.borrow();
2111 let r = tmp.promise_clients_to_resolve.borrow_mut().drain();
2112 r
2113 };
2114 for ((c, ops), _) in to_resolve {
2115 let resolved = match response.clone() {
2116 Ok(v) => match v.get() {
2117 Ok(x) => x.get_pipelined_cap(&ops),
2118 Err(e) => Err(e),
2119 },
2120 Err(e) => Err(e),
2121 };
2122 if let Some(c) = c.upgrade() {
2123 c.borrow_mut().resolve(resolved);
2124 }
2125 }
2126
2127 let new_variant = match response {
2128 Ok(r) => PipelineVariant::Resolved(r),
2129 Err(e) => PipelineVariant::Broken(e),
2130 };
2131 let _old_variant = mem::replace(&mut state.borrow_mut().variant, new_variant);
2132
2133 let waiters = state.borrow_mut().resolution_waiters.drain();
2134 for (_, waiter) in waiters {
2135 let _ = waiter.send(());
2136 }
2137 }
2138}
2139
2140struct Pipeline<VatId>
2141where
2142 VatId: 'static,
2143{
2144 state: Rc<RefCell<PipelineState<VatId>>>,
2145}
2146
2147impl<VatId> Pipeline<VatId> {
2148 fn new(
2149 connection_state: &Rc<ConnectionState<VatId>>,
2150 question_ref: Rc<RefCell<QuestionRef<VatId>>>,
2151 redirect_later: Option<Promise<Response<VatId>, ::capnp::Error>>,
2152 ) -> Self {
2153 let state = Rc::new(RefCell::new(PipelineState {
2154 variant: PipelineVariant::Waiting(question_ref),
2155 connection_state: connection_state.clone(),
2156 redirect_later: None,
2157 resolve_self_promise: Promise::from_future(future::pending()),
2158 promise_clients_to_resolve: RefCell::new(crate::sender_queue::SenderQueue::new()),
2159 resolution_waiters: crate::sender_queue::SenderQueue::new(),
2160 }));
2161 if let Some(redirect_later_promise) = redirect_later {
2162 let fork = redirect_later_promise.shared();
2163 let this = Rc::downgrade(&state);
2164 let resolve_self_promise =
2165 connection_state.eagerly_evaluate(fork.clone().then(move |response| {
2166 let Some(state) = this.upgrade() else {
2167 return Promise::err(Error::failed("dangling reference to this".into()));
2168 };
2169 PipelineState::resolve(&state, response);
2170 Promise::ok(())
2171 }));
2172
2173 state.borrow_mut().resolve_self_promise = resolve_self_promise;
2174 state.borrow_mut().redirect_later = Some(RefCell::new(fork));
2175 }
2176 Self { state }
2177 }
2178
2179 fn when_resolved(&self) -> Promise<(), Error> {
2180 self.state.borrow_mut().resolution_waiters.push(())
2181 }
2182
2183 fn never_done(
2184 connection_state: Rc<ConnectionState<VatId>>,
2185 question_ref: Rc<RefCell<QuestionRef<VatId>>>,
2186 ) -> Self {
2187 let state = Rc::new(RefCell::new(PipelineState {
2188 variant: PipelineVariant::Waiting(question_ref),
2189 connection_state,
2190 redirect_later: None,
2191 resolve_self_promise: Promise::from_future(future::pending()),
2192 promise_clients_to_resolve: RefCell::new(crate::sender_queue::SenderQueue::new()),
2193 resolution_waiters: crate::sender_queue::SenderQueue::new(),
2194 }));
2195
2196 Self { state }
2197 }
2198}
2199
2200impl<VatId> PipelineHook for Pipeline<VatId> {
2201 fn add_ref(&self) -> Box<dyn PipelineHook> {
2202 Box::new(Self {
2203 state: self.state.clone(),
2204 })
2205 }
2206 fn get_pipelined_cap(&self, ops: &[PipelineOp]) -> Box<dyn ClientHook> {
2207 self.get_pipelined_cap_move(ops.into())
2208 }
2209 fn get_pipelined_cap_move(&self, ops: Vec<PipelineOp>) -> Box<dyn ClientHook> {
2210 match *self.state.borrow() {
2211 PipelineState {
2212 variant: PipelineVariant::Waiting(ref question_ref),
2213 ref connection_state,
2214 ref redirect_later,
2215 ref promise_clients_to_resolve,
2216 ..
2217 } => {
2218 let pipeline_client =
2220 PipelineClient::new(connection_state, question_ref.clone(), ops.clone());
2221
2222 match redirect_later {
2223 Some(_r) => {
2224 let client: Client<VatId> = pipeline_client.into();
2225 let promise_client =
2226 PromiseClient::new(connection_state, Box::new(client), None);
2227 promise_clients_to_resolve
2228 .borrow_mut()
2229 .push_detach((Rc::downgrade(&promise_client), ops));
2230 let result: Client<VatId> = promise_client.into();
2231 Box::new(result)
2232 }
2233 None => {
2234 let client: Client<VatId> = pipeline_client.into();
2236 Box::new(client)
2237 }
2238 }
2239 }
2240 PipelineState {
2241 variant: PipelineVariant::Resolved(ref response),
2242 ..
2243 } => response.get().unwrap().get_pipelined_cap(&ops[..]).unwrap(),
2244 PipelineState {
2245 variant: PipelineVariant::Broken(ref e),
2246 ..
2247 } => broken::new_cap(e.clone()),
2248 }
2249 }
2250}
2251
2252pub struct Params {
2253 request: Box<dyn crate::IncomingMessage>,
2254 cap_table: Vec<Option<Box<dyn ClientHook>>>,
2255}
2256
2257impl Params {
2258 fn new(
2259 request: Box<dyn crate::IncomingMessage>,
2260 cap_table: Vec<Option<Box<dyn ClientHook>>>,
2261 ) -> Self {
2262 Self { request, cap_table }
2263 }
2264}
2265
2266impl ParamsHook for Params {
2267 fn get(&self) -> ::capnp::Result<any_pointer::Reader<'_>> {
2268 let root: message::Reader = self.request.get_body()?.get_as()?;
2269 let message::Call(call) = root.which()? else {
2270 unreachable!()
2271 };
2272 use ::capnp::traits::Imbue;
2273 let mut content = call?.get_params()?.get_content();
2274 content.imbue(&self.cap_table);
2275 Ok(content)
2276 }
2277}
2278
2279enum ResultsVariant {
2280 Rpc(
2281 Box<dyn crate::OutgoingMessage>,
2282 Vec<Option<Box<dyn ClientHook>>>,
2283 ),
2284 LocallyRedirected(
2285 ::capnp::message::Builder<::capnp::message::HeapAllocator>,
2286 Vec<Option<Box<dyn ClientHook>>>,
2287 ),
2288}
2289
2290struct ResultsInner<VatId>
2291where
2292 VatId: 'static,
2293{
2294 connection_state: Rc<ConnectionState<VatId>>,
2295 variant: Option<ResultsVariant>,
2296 redirect_results: bool,
2297 answer_id: AnswerId,
2298 finish_received: Rc<Cell<bool>>,
2299 pipeline_sender: Option<queued::PipelineInnerSender>,
2300}
2301
2302impl<VatId> ResultsInner<VatId>
2303where
2304 VatId: 'static,
2305{
2306 fn ensure_initialized(&mut self) {
2307 let answer_id = self.answer_id;
2308 if self.variant.is_none() {
2309 match (
2310 self.redirect_results,
2311 self.connection_state.connection.borrow_mut().as_mut(),
2312 ) {
2313 (false, Ok(c)) => {
2314 let mut message = c.new_outgoing_message(100); {
2317 let root: message::Builder = message.get_body().unwrap().init_as();
2318 let mut ret = root.init_return();
2319 ret.set_answer_id(answer_id);
2320 ret.set_release_param_caps(false);
2321 }
2322 self.variant = Some(ResultsVariant::Rpc(message, Vec::new()));
2323 }
2324 _ => {
2325 self.variant = Some(ResultsVariant::LocallyRedirected(
2326 ::capnp::message::Builder::new_default(),
2327 Vec::new(),
2328 ));
2329 }
2330 }
2331 }
2332 }
2333}
2334
2335pub struct Results<VatId>
2337where
2338 VatId: 'static,
2339{
2340 inner: Option<ResultsInner<VatId>>,
2341 results_done_fulfiller: Option<oneshot::Sender<ResultsInner<VatId>>>,
2342}
2343
2344impl<VatId> Results<VatId>
2345where
2346 VatId: 'static,
2347{
2348 fn new(
2349 connection_state: &Rc<ConnectionState<VatId>>,
2350 answer_id: AnswerId,
2351 redirect_results: bool,
2352 fulfiller: oneshot::Sender<ResultsInner<VatId>>,
2353 finish_received: Rc<Cell<bool>>,
2354 pipeline_sender: Option<queued::PipelineInnerSender>,
2355 ) -> Self {
2356 Self {
2357 inner: Some(ResultsInner {
2358 variant: None,
2359 connection_state: connection_state.clone(),
2360 redirect_results,
2361 answer_id,
2362 finish_received,
2363 pipeline_sender,
2364 }),
2365 results_done_fulfiller: Some(fulfiller),
2366 }
2367 }
2368}
2369
2370impl<VatId> Drop for Results<VatId> {
2371 fn drop(&mut self) {
2372 match (self.inner.take(), self.results_done_fulfiller.take()) {
2373 (Some(inner), Some(fulfiller)) => {
2374 let _ = fulfiller.send(inner);
2375 }
2376 (None, None) => (),
2377 _ => unreachable!(),
2378 }
2379 }
2380}
2381
2382impl<VatId> ResultsHook for Results<VatId> {
2383 fn get(&mut self) -> ::capnp::Result<any_pointer::Builder<'_>> {
2384 use ::capnp::traits::ImbueMut;
2385 let Some(ref mut inner) = self.inner else {
2386 unreachable!();
2387 };
2388 inner.ensure_initialized();
2389 match inner.variant {
2390 None => unreachable!(),
2391 Some(ResultsVariant::Rpc(ref mut message, ref mut cap_table)) => {
2392 let root: message::Builder = message.get_body()?.get_as()?;
2393 let message::Return(ret) = root.which()? else {
2394 unreachable!();
2395 };
2396 let return_::Results(payload) = ret?.which()? else {
2397 unreachable!()
2398 };
2399 let mut content = payload?.get_content();
2400 content.imbue_mut(cap_table);
2401 Ok(content)
2402 }
2403 Some(ResultsVariant::LocallyRedirected(ref mut message, ref mut cap_table)) => {
2404 let mut result: any_pointer::Builder = message.get_root()?;
2405 result.imbue_mut(cap_table);
2406 Ok(result)
2407 }
2408 }
2409 }
2410
2411 fn set_pipeline(&mut self) -> ::capnp::Result<()> {
2412 use ::capnp::traits::ImbueMut;
2413 let root = self.get()?;
2414 let size = root.target_size()?;
2415 let mut message2 = capnp::message::Builder::new(
2416 capnp::message::HeapAllocator::new().first_segment_words(size.word_count as u32 + 1),
2417 );
2418 let mut root2: capnp::any_pointer::Builder = message2.init_root();
2419 let mut cap_table2 = vec![];
2420 root2.imbue_mut(&mut cap_table2);
2421 root2.set_as(root.into_reader())?;
2422 let hook =
2423 Box::new(local::ResultsDone::new(message2, cap_table2)) as Box<dyn ResultsDoneHook>;
2424 let Some(ref mut inner) = self.inner else {
2425 unreachable!();
2426 };
2427 let Some(sender) = inner.pipeline_sender.take() else {
2428 return Err(Error::failed("set_pipeline() called twice".into()));
2429 };
2430 sender.complete(Box::new(local::Pipeline::new(hook)));
2431 Ok(())
2432 }
2433
2434 fn tail_call(self: Box<Self>, _request: Box<dyn RequestHook>) -> Promise<(), Error> {
2435 unimplemented!()
2436 }
2437
2438 fn direct_tail_call(
2439 mut self: Box<Self>,
2440 request: Box<dyn RequestHook>,
2441 ) -> (Promise<(), Error>, Box<dyn PipelineHook>) {
2442 if let (Some(inner), Some(fulfiller)) =
2443 (self.inner.take(), self.results_done_fulfiller.take())
2444 {
2445 let state = inner.connection_state.clone();
2446 if request.get_brand() == state.get_brand() && !inner.redirect_results {
2447 if let Some((question_id, promise, pipeline)) = request.tail_send() {
2450 let mut message = state.new_outgoing_message(100).expect("no connection?"); {
2453 let root: message::Builder = message.get_body().unwrap().init_as();
2454 let mut ret = root.init_return();
2455 ret.set_answer_id(inner.answer_id);
2456 ret.set_release_param_caps(false);
2457 ret.set_take_from_other_question(question_id);
2458 }
2459 let _ = message.send();
2460
2461 let _ = fulfiller.send(inner); return (promise, pipeline);
2465 }
2466 unimplemented!()
2467 } else {
2468 unimplemented!()
2469 }
2470 } else {
2471 unreachable!();
2472 }
2473 }
2474
2475 fn allow_cancellation(&self) {
2476 unimplemented!()
2477 }
2478}
2479
2480enum ResultsDoneVariant {
2481 Rpc(
2482 Rc<::capnp::message::Builder<::capnp::message::HeapAllocator>>,
2483 Vec<Option<Box<dyn ClientHook>>>,
2484 ),
2485 LocallyRedirected(
2486 ::capnp::message::Builder<::capnp::message::HeapAllocator>,
2487 Vec<Option<Box<dyn ClientHook>>>,
2488 ),
2489}
2490
2491struct ResultsDone {
2492 inner: Rc<ResultsDoneVariant>,
2493}
2494
2495impl ResultsDone {
2496 fn from_results_inner<VatId>(
2497 results_inner: Result<ResultsInner<VatId>, Error>,
2498 call_status: Result<(), Error>,
2499 pipeline_sender: queued::PipelineInnerSender,
2500 ) -> Result<Box<dyn ResultsDoneHook>, Error>
2501 where
2502 VatId: 'static,
2503 {
2504 match results_inner {
2505 Err(e) => {
2506 pipeline_sender.complete(Box::new(crate::broken::Pipeline::new(e.clone())));
2507 Err(e)
2508 }
2509 Ok(mut results_inner) => {
2510 results_inner.ensure_initialized();
2511 let ResultsInner {
2512 connection_state,
2513 variant,
2514 answer_id,
2515 finish_received,
2516 ..
2517 } = results_inner;
2518 match variant {
2519 None => unreachable!(),
2520 Some(ResultsVariant::Rpc(mut message, cap_table)) => {
2521 match (finish_received.get(), call_status) {
2522 (true, _) => {
2523 let hook = Box::new(Self::rpc(Rc::new(message.take()), cap_table))
2524 as Box<dyn ResultsDoneHook>;
2525 pipeline_sender
2526 .complete(Box::new(local::Pipeline::new(hook.clone())));
2527
2528 if let Ok(connection) =
2530 connection_state.connection.borrow_mut().as_mut()
2531 {
2532 let mut message = connection.new_outgoing_message(10);
2533 {
2534 let root: message::Builder =
2535 message.get_body()?.get_as()?;
2536 let mut ret = root.init_return();
2537 ret.set_answer_id(answer_id);
2538 ret.set_release_param_caps(false);
2539 ret.set_canceled(());
2540 }
2541 let _ = message.send();
2542 }
2543
2544 connection_state.answer_has_sent_return(answer_id, Vec::new());
2545 Ok(hook)
2546 }
2547 (false, Ok(())) => {
2548 let exports = {
2549 let root: message::Builder = message.get_body()?.get_as()?;
2550 let message::Return(Ok(mut ret)) = root.which()? else {
2551 unreachable!()
2552 };
2553 if cap_table.is_empty() {
2554 ret.set_no_finish_needed(true);
2555 finish_received.set(true);
2556 }
2557 let crate::rpc_capnp::return_::Results(Ok(payload)) =
2558 ret.which()?
2559 else {
2560 unreachable!()
2561 };
2562 ConnectionState::write_descriptors(
2563 &connection_state,
2564 &cap_table,
2565 payload,
2566 )
2567 };
2568
2569 let (_promise, m) = message.send();
2570 connection_state.answer_has_sent_return(answer_id, exports);
2571 let hook =
2572 Box::new(Self::rpc(m, cap_table)) as Box<dyn ResultsDoneHook>;
2573 pipeline_sender
2574 .complete(Box::new(local::Pipeline::new(hook.clone())));
2575 Ok(hook)
2576 }
2577 (false, Err(e)) => {
2578 if let Ok(connection) =
2580 connection_state.connection.borrow_mut().as_mut()
2581 {
2582 let mut message = connection.new_outgoing_message(50); {
2584 let root: message::Builder =
2585 message.get_body()?.get_as()?;
2586 let mut ret = root.init_return();
2587 ret.set_answer_id(answer_id);
2588 ret.set_release_param_caps(false);
2589 let mut exc = ret.init_exception();
2590 from_error(&e, exc.reborrow());
2591 }
2592 let _ = message.send();
2593 }
2594 connection_state.answer_has_sent_return(answer_id, Vec::new());
2595
2596 pipeline_sender
2597 .complete(Box::new(crate::broken::Pipeline::new(e.clone())));
2598
2599 Err(e)
2600 }
2601 }
2602 }
2603 Some(ResultsVariant::LocallyRedirected(results_done, cap_table)) => {
2604 let hook = Box::new(Self::redirected(results_done, cap_table))
2605 as Box<dyn ResultsDoneHook>;
2606 pipeline_sender
2607 .complete(Box::new(crate::local::Pipeline::new(hook.clone())));
2608 Ok(hook)
2609 }
2610 }
2611 }
2612 }
2613 }
2614
2615 fn rpc(
2616 message: Rc<::capnp::message::Builder<::capnp::message::HeapAllocator>>,
2617 cap_table: Vec<Option<Box<dyn ClientHook>>>,
2618 ) -> Self {
2619 Self {
2620 inner: Rc::new(ResultsDoneVariant::Rpc(message, cap_table)),
2621 }
2622 }
2623
2624 fn redirected(
2625 message: ::capnp::message::Builder<::capnp::message::HeapAllocator>,
2626 cap_table: Vec<Option<Box<dyn ClientHook>>>,
2627 ) -> Self {
2628 Self {
2629 inner: Rc::new(ResultsDoneVariant::LocallyRedirected(message, cap_table)),
2630 }
2631 }
2632}
2633
2634impl ResultsDoneHook for ResultsDone {
2635 fn add_ref(&self) -> Box<dyn ResultsDoneHook> {
2636 Box::new(Self {
2637 inner: self.inner.clone(),
2638 })
2639 }
2640 fn get(&self) -> ::capnp::Result<any_pointer::Reader<'_>> {
2641 use ::capnp::traits::Imbue;
2642 match *self.inner {
2643 ResultsDoneVariant::Rpc(ref message, ref cap_table) => {
2644 let root: message::Reader = message.get_root_as_reader()?;
2645 let message::Return(ret) = root.which()? else {
2646 unreachable!();
2647 };
2648 let crate::rpc_capnp::return_::Results(payload) = ret?.which()? else {
2649 unreachable!();
2650 };
2651 let mut content = payload?.get_content();
2652 content.imbue(cap_table);
2653 Ok(content)
2654 }
2655 ResultsDoneVariant::LocallyRedirected(ref message, ref cap_table) => {
2656 let mut result: any_pointer::Reader = message.get_root_as_reader()?;
2657 result.imbue(cap_table);
2658 Ok(result)
2659 }
2660 }
2661 }
2662}
2663
2664enum ClientVariant<VatId>
2665where
2666 VatId: 'static,
2667{
2668 Import(Rc<RefCell<ImportClient<VatId>>>),
2669 Pipeline(Rc<RefCell<PipelineClient<VatId>>>),
2670 Promise(Rc<RefCell<PromiseClient<VatId>>>),
2671}
2672
2673struct Client<VatId>
2674where
2675 VatId: 'static,
2676{
2677 connection_state: Rc<ConnectionState<VatId>>,
2678 variant: ClientVariant<VatId>,
2679 flow_controller: Rc<RefCell<Option<Box<dyn crate::FlowController>>>>,
2680}
2681
2682enum WeakClientVariant<VatId>
2683where
2684 VatId: 'static,
2685{
2686 Import(Weak<RefCell<ImportClient<VatId>>>),
2687 Pipeline(Weak<RefCell<PipelineClient<VatId>>>),
2688 Promise(Weak<RefCell<PromiseClient<VatId>>>),
2689}
2690
2691struct WeakClient<VatId>
2692where
2693 VatId: 'static,
2694{
2695 connection_state: Weak<ConnectionState<VatId>>,
2696 variant: WeakClientVariant<VatId>,
2697 flow_controller: Weak<RefCell<Option<Box<dyn crate::FlowController>>>>,
2698}
2699
2700impl<VatId> WeakClient<VatId>
2701where
2702 VatId: 'static,
2703{
2704 fn upgrade(&self) -> Option<Client<VatId>> {
2705 let variant = match &self.variant {
2706 WeakClientVariant::Import(ic) => ClientVariant::Import(ic.upgrade()?),
2707 WeakClientVariant::Pipeline(pc) => ClientVariant::Pipeline(pc.upgrade()?),
2708 WeakClientVariant::Promise(pc) => ClientVariant::Promise(pc.upgrade()?),
2709 };
2710 let connection_state = self.connection_state.upgrade()?;
2711 let flow_controller = self.flow_controller.upgrade()?;
2712 Some(Client {
2713 connection_state,
2714 variant,
2715 flow_controller,
2716 })
2717 }
2718}
2719
2720struct ImportClient<VatId>
2721where
2722 VatId: 'static,
2723{
2724 connection_state: Rc<ConnectionState<VatId>>,
2725 import_id: ImportId,
2726
2727 remote_ref_count: u32,
2729}
2730
2731impl<VatId> Drop for ImportClient<VatId> {
2732 fn drop(&mut self) {
2733 let connection_state = self.connection_state.clone();
2734
2735 assert!(connection_state
2736 .client_downcast_map
2737 .borrow_mut()
2738 .remove(&((self) as *const _ as usize))
2739 .is_some());
2740
2741 connection_state
2746 .imports
2747 .borrow_mut()
2748 .slots
2749 .remove(&self.import_id);
2750
2751 let mut tmp = connection_state.connection.borrow_mut();
2753 if let (true, Ok(c)) = (self.remote_ref_count > 0, tmp.as_mut()) {
2754 let mut message = c.new_outgoing_message(10);
2755 {
2756 let root: message::Builder = message.get_body().unwrap().init_as();
2757 let mut release = root.init_release();
2758 release.set_id(self.import_id);
2759 release.set_reference_count(self.remote_ref_count);
2760 }
2761 let _ = message.send();
2762 }
2763 }
2764}
2765
2766impl<VatId> ImportClient<VatId>
2767where
2768 VatId: 'static,
2769{
2770 fn new(
2771 connection_state: &Rc<ConnectionState<VatId>>,
2772 import_id: ImportId,
2773 ) -> Rc<RefCell<Self>> {
2774 Rc::new(RefCell::new(Self {
2775 connection_state: connection_state.clone(),
2776 import_id,
2777 remote_ref_count: 0,
2778 }))
2779 }
2780
2781 fn add_remote_ref(&mut self) {
2782 self.remote_ref_count += 1;
2783 }
2784}
2785
2786impl<VatId> From<Rc<RefCell<ImportClient<VatId>>>> for Client<VatId> {
2787 fn from(client: Rc<RefCell<ImportClient<VatId>>>) -> Self {
2788 let connection_state = client.borrow().connection_state.clone();
2789 Self::new(&connection_state, ClientVariant::Import(client))
2790 }
2791}
2792
2793struct PipelineClient<VatId>
2795where
2796 VatId: 'static,
2797{
2798 connection_state: Rc<ConnectionState<VatId>>,
2799 question_ref: Rc<RefCell<QuestionRef<VatId>>>,
2800 ops: Vec<PipelineOp>,
2801}
2802
2803impl<VatId> PipelineClient<VatId>
2804where
2805 VatId: 'static,
2806{
2807 fn new(
2808 connection_state: &Rc<ConnectionState<VatId>>,
2809 question_ref: Rc<RefCell<QuestionRef<VatId>>>,
2810 ops: Vec<PipelineOp>,
2811 ) -> Rc<RefCell<Self>> {
2812 Rc::new(RefCell::new(Self {
2813 connection_state: connection_state.clone(),
2814 question_ref,
2815 ops,
2816 }))
2817 }
2818}
2819
2820impl<VatId> From<Rc<RefCell<PipelineClient<VatId>>>> for Client<VatId> {
2821 fn from(client: Rc<RefCell<PipelineClient<VatId>>>) -> Self {
2822 let connection_state = client.borrow().connection_state.clone();
2823 Self::new(&connection_state, ClientVariant::Pipeline(client))
2824 }
2825}
2826
2827impl<VatId> Drop for PipelineClient<VatId> {
2828 fn drop(&mut self) {
2829 assert!(self
2830 .connection_state
2831 .client_downcast_map
2832 .borrow_mut()
2833 .remove(&((self) as *const _ as usize))
2834 .is_some());
2835 }
2836}
2837
2838struct PromiseClient<VatId>
2841where
2842 VatId: 'static,
2843{
2844 connection_state: Rc<ConnectionState<VatId>>,
2845 is_resolved: bool,
2846 cap: Box<dyn ClientHook>,
2847 import_id: Option<ImportId>,
2848 received_call: bool,
2849 resolution_waiters: crate::sender_queue::SenderQueue<(), Box<dyn ClientHook>>,
2850}
2851
2852impl<VatId> PromiseClient<VatId> {
2853 fn new(
2854 connection_state: &Rc<ConnectionState<VatId>>,
2855 initial: Box<dyn ClientHook>,
2856 import_id: Option<ImportId>,
2857 ) -> Rc<RefCell<Self>> {
2858 Rc::new(RefCell::new(Self {
2859 connection_state: connection_state.clone(),
2860 is_resolved: false,
2861 cap: initial,
2862 import_id,
2863 received_call: false,
2864 resolution_waiters: crate::sender_queue::SenderQueue::new(),
2865 }))
2866 }
2867
2868 fn resolve(&mut self, replacement: Result<Box<dyn ClientHook>, Error>) {
2869 let (mut replacement, is_error) = match replacement {
2870 Ok(v) => (v, false),
2871 Err(e) => (broken::new_cap(e), true),
2872 };
2873 let connection_state = self.connection_state.clone();
2874 let is_connected = connection_state.connection.borrow().is_ok();
2875 let replacement_brand = replacement.get_brand();
2876 if replacement_brand != connection_state.get_brand()
2877 && self.received_call
2878 && !is_error
2879 && is_connected
2880 {
2881 let (fulfiller, promise) = oneshot::channel::<Result<(), Error>>();
2886 let promise = promise
2887 .map_err(crate::canceled_to_error)
2888 .and_then(future::ready);
2889 let embargo = Embargo::new(fulfiller);
2890 let embargo_id = connection_state.embargoes.borrow_mut().push(embargo);
2891
2892 let mut message = connection_state
2893 .new_outgoing_message(50)
2894 .expect("no connection?"); {
2896 let root: message::Builder = message.get_body().unwrap().init_as();
2897 let mut disembargo = root.init_disembargo();
2898 disembargo
2899 .reborrow()
2900 .init_context()
2901 .set_sender_loopback(embargo_id);
2902 let target = disembargo.init_target();
2903
2904 let redirect = connection_state.write_target(&*self.cap, target);
2905 if redirect.is_some() {
2906 panic!("Original promise target should always be from this RPC connection.")
2907 }
2908 }
2909
2910 let embargo_promise = promise.map_ok(move |()| replacement);
2912
2913 let mut queued_client = queued::Client::new(None);
2914 let weak_queued = Rc::downgrade(&queued_client.inner);
2915
2916 queued_client.drive(embargo_promise.then(move |r| {
2917 if let Some(q) = weak_queued.upgrade() {
2918 queued::ClientInner::resolve(&q, r);
2919 }
2920 Promise::ok(())
2921 }));
2922
2923 replacement = Box::new(queued_client);
2926
2927 let _ = message.send();
2928 }
2929
2930 for ((), waiter) in self.resolution_waiters.drain() {
2931 let _ = waiter.send(replacement.clone());
2932 }
2933
2934 let old_cap = mem::replace(&mut self.cap, replacement);
2935 connection_state.add_task(async move {
2936 drop(old_cap);
2937 Ok(())
2938 });
2939
2940 self.is_resolved = true;
2941 }
2942}
2943
2944impl<VatId> Drop for PromiseClient<VatId> {
2945 fn drop(&mut self) {
2946 let self_ptr = (self) as *const _ as usize;
2947
2948 if let Some(id) = self.import_id {
2949 let slots = &mut self.connection_state.imports.borrow_mut().slots;
2954 if let Some(import) = slots.get_mut(&id) {
2955 if let Some(c) = &import.app_client {
2956 if let Some(cs) = c.upgrade() {
2957 if cs.get_ptr() == self_ptr {
2958 import.app_client = None;
2959 }
2960 }
2961 }
2962 }
2963 }
2964
2965 assert!(self
2966 .connection_state
2967 .client_downcast_map
2968 .borrow_mut()
2969 .remove(&self_ptr)
2970 .is_some());
2971 }
2972}
2973
2974impl<VatId> From<Rc<RefCell<PromiseClient<VatId>>>> for Client<VatId> {
2975 fn from(client: Rc<RefCell<PromiseClient<VatId>>>) -> Self {
2976 let connection_state = client.borrow().connection_state.clone();
2977 Self::new(&connection_state, ClientVariant::Promise(client))
2978 }
2979}
2980
2981impl<VatId> Client<VatId> {
2982 fn new(connection_state: &Rc<ConnectionState<VatId>>, variant: ClientVariant<VatId>) -> Self {
2983 let client = Self {
2984 connection_state: connection_state.clone(),
2985 variant,
2986 flow_controller: Rc::new(RefCell::new(None)),
2987 };
2988 let weak = client.downgrade();
2989
2990 connection_state
2992 .client_downcast_map
2993 .borrow_mut()
2994 .insert(client.get_ptr(), weak);
2995 client
2996 }
2997 fn downgrade(&self) -> WeakClient<VatId> {
2998 let variant = match &self.variant {
2999 ClientVariant::Import(import_client) => {
3000 WeakClientVariant::Import(Rc::downgrade(import_client))
3001 }
3002 ClientVariant::Pipeline(pipeline_client) => {
3003 WeakClientVariant::Pipeline(Rc::downgrade(pipeline_client))
3004 }
3005 ClientVariant::Promise(promise_client) => {
3006 WeakClientVariant::Promise(Rc::downgrade(promise_client))
3007 }
3008 };
3009 WeakClient {
3010 connection_state: Rc::downgrade(&self.connection_state),
3011 variant,
3012 flow_controller: Rc::downgrade(&self.flow_controller),
3013 }
3014 }
3015
3016 fn from_ptr(ptr: usize, connection_state: &ConnectionState<VatId>) -> Option<Self> {
3017 match connection_state.client_downcast_map.borrow().get(&ptr) {
3018 Some(c) => c.upgrade(),
3019 None => None,
3020 }
3021 }
3022
3023 fn write_target(
3024 &self,
3025 mut target: crate::rpc_capnp::message_target::Builder,
3026 ) -> Option<Box<dyn ClientHook>> {
3027 match &self.variant {
3028 ClientVariant::Import(import_client) => {
3029 target.set_imported_cap(import_client.borrow().import_id);
3030 None
3031 }
3032 ClientVariant::Pipeline(pipeline_client) => {
3033 let mut builder = target.init_promised_answer();
3034 let question_ref = &pipeline_client.borrow().question_ref;
3035 builder.set_question_id(question_ref.borrow().id);
3036 let mut transform =
3037 builder.init_transform(pipeline_client.borrow().ops.len() as u32);
3038 for idx in 0..pipeline_client.borrow().ops.len() {
3039 if let ::capnp::private::capability::PipelineOp::GetPointerField(ordinal) =
3040 pipeline_client.borrow().ops[idx]
3041 {
3042 transform
3043 .reborrow()
3044 .get(idx as u32)
3045 .set_get_pointer_field(ordinal);
3046 }
3047 }
3048 None
3049 }
3050 ClientVariant::Promise(promise_client) => {
3051 promise_client.borrow_mut().received_call = true;
3052 self.connection_state
3053 .write_target(&*promise_client.borrow().cap, target)
3054 }
3055 }
3056 }
3057
3058 fn write_descriptor(&self, mut descriptor: cap_descriptor::Builder) -> Option<u32> {
3059 match &self.variant {
3060 ClientVariant::Import(import_client) => {
3061 descriptor.set_receiver_hosted(import_client.borrow().import_id);
3062 None
3063 }
3064 ClientVariant::Pipeline(pipeline_client) => {
3065 let mut promised_answer = descriptor.init_receiver_answer();
3066 let question_ref = &pipeline_client.borrow().question_ref;
3067 promised_answer.set_question_id(question_ref.borrow().id);
3068 let mut transform =
3069 promised_answer.init_transform(pipeline_client.borrow().ops.len() as u32);
3070 for idx in 0..pipeline_client.borrow().ops.len() {
3071 if let ::capnp::private::capability::PipelineOp::GetPointerField(ordinal) =
3072 pipeline_client.borrow().ops[idx]
3073 {
3074 transform
3075 .reborrow()
3076 .get(idx as u32)
3077 .set_get_pointer_field(ordinal);
3078 }
3079 }
3080
3081 None
3082 }
3083 ClientVariant::Promise(promise_client) => {
3084 promise_client.borrow_mut().received_call = true;
3085
3086 ConnectionState::write_descriptor(
3087 &self.connection_state.clone(),
3088 promise_client.borrow().cap.clone(),
3089 descriptor,
3090 )
3091 .unwrap()
3092 }
3093 }
3094 }
3095}
3096
3097impl<VatId> Clone for Client<VatId> {
3098 fn clone(&self) -> Self {
3099 let variant = match &self.variant {
3100 ClientVariant::Import(import_client) => ClientVariant::Import(import_client.clone()),
3101 ClientVariant::Pipeline(pipeline_client) => {
3102 ClientVariant::Pipeline(pipeline_client.clone())
3103 }
3104 ClientVariant::Promise(promise_client) => {
3105 ClientVariant::Promise(promise_client.clone())
3106 }
3107 };
3108 Self {
3109 connection_state: self.connection_state.clone(),
3110 variant,
3111 flow_controller: self.flow_controller.clone(),
3112 }
3113 }
3114}
3115
3116impl<VatId> ClientHook for Client<VatId> {
3117 fn add_ref(&self) -> Box<dyn ClientHook> {
3118 Box::new(self.clone())
3119 }
3120 fn new_call(
3121 &self,
3122 interface_id: u64,
3123 method_id: u16,
3124 size_hint: Option<::capnp::MessageSize>,
3125 ) -> ::capnp::capability::Request<any_pointer::Owned, any_pointer::Owned> {
3126 let request: Box<dyn RequestHook> =
3127 match Request::new(self.connection_state.clone(), size_hint, self.clone()) {
3128 Ok(mut request) => {
3129 {
3130 let mut call_builder = request.init_call();
3131 call_builder.set_interface_id(interface_id);
3132 call_builder.set_method_id(method_id);
3133 }
3134 Box::new(request)
3135 }
3136 Err(e) => Box::new(broken::Request::new(e, None)),
3137 };
3138
3139 ::capnp::capability::Request::new(request)
3140 }
3141
3142 fn call(
3143 &self,
3144 interface_id: u64,
3145 method_id: u16,
3146 params: Box<dyn ParamsHook>,
3147 mut results: Box<dyn ResultsHook>,
3148 ) -> Promise<(), Error> {
3149 let maybe_request = params.get().and_then(|p| {
3152 let mut request = p
3153 .target_size()
3154 .map(|s| self.new_call(interface_id, method_id, Some(s)))?;
3155 request.get().set_as(p)?;
3156 Ok(request)
3157 });
3158
3159 match maybe_request {
3160 Err(e) => Promise::err(e),
3161 Ok(request) => {
3162 let ::capnp::capability::RemotePromise { promise, .. } = request.send();
3163
3164 Promise::from_future(async move {
3165 let response = promise.await?;
3166 results.get()?.set_as(response.get()?)?;
3167 Ok(())
3168 })
3169 }
3170 }
3171 }
3178
3179 fn get_ptr(&self) -> usize {
3180 match &self.variant {
3181 ClientVariant::Import(import_client) => (&*import_client.borrow()) as *const _ as usize,
3182 ClientVariant::Pipeline(pipeline_client) => {
3183 (&*pipeline_client.borrow()) as *const _ as usize
3184 }
3185 ClientVariant::Promise(promise_client) => {
3186 (&*promise_client.borrow()) as *const _ as usize
3187 }
3188 }
3189 }
3190
3191 fn get_brand(&self) -> usize {
3192 self.connection_state.get_brand()
3193 }
3194
3195 fn get_resolved(&self) -> Option<Box<dyn ClientHook>> {
3196 match &self.variant {
3197 ClientVariant::Import(_import_client) => None,
3198 ClientVariant::Pipeline(_pipeline_client) => None,
3199 ClientVariant::Promise(promise_client) => {
3200 if promise_client.borrow().is_resolved {
3201 Some(promise_client.borrow().cap.clone())
3202 } else {
3203 None
3204 }
3205 }
3206 }
3207 }
3208
3209 fn when_more_resolved(&self) -> Option<Promise<Box<dyn ClientHook>, Error>> {
3210 match &self.variant {
3211 ClientVariant::Import(_import_client) => None,
3212 ClientVariant::Pipeline(_pipeline_client) => None,
3213 ClientVariant::Promise(promise_client) => {
3214 Some(promise_client.borrow_mut().resolution_waiters.push(()))
3215 }
3216 }
3217 }
3218
3219 fn when_resolved(&self) -> Promise<(), Error> {
3220 default_when_resolved_impl(self)
3221 }
3222}
3223
3224pub(crate) fn default_when_resolved_impl<C>(client: &C) -> Promise<(), Error>
3225where
3226 C: ClientHook,
3227{
3228 match client.when_more_resolved() {
3229 Some(promise) => {
3230 Promise::from_future(promise.and_then(|resolution| resolution.when_resolved()))
3231 }
3232 None => Promise::ok(()),
3233 }
3234}
3235
3236struct SingleCapPipeline {
3239 cap: Box<dyn ClientHook>,
3240}
3241
3242impl SingleCapPipeline {
3243 fn new(cap: Box<dyn ClientHook>) -> Self {
3244 Self { cap }
3245 }
3246}
3247
3248impl PipelineHook for SingleCapPipeline {
3249 fn add_ref(&self) -> Box<dyn PipelineHook> {
3250 Box::new(Self {
3251 cap: self.cap.clone(),
3252 })
3253 }
3254 fn get_pipelined_cap(&self, ops: &[PipelineOp]) -> Box<dyn ClientHook> {
3255 if ops.is_empty() {
3256 self.cap.add_ref()
3257 } else {
3258 broken::new_cap(Error::failed("Invalid pipeline transform.".to_string()))
3259 }
3260 }
3261}