1mod broker_subscriptions;
2mod proxies;
3mod select;
4
5use crate::bus_listener::{BusListener, BusListenerHandle};
6#[cfg(feature = "introspection")]
7use crate::core::introspection::{DynIntrospectable, Introspection, References};
8use crate::core::message::{
9 AbortFunctionCall, AddBusListenerFilter, AddChannelCapacity, BusListenerCurrentFinished,
10 CallFunction, CallFunction2, CallFunctionReply, CallFunctionResult, ChannelEndClaimed,
11 ChannelEndClosed, ClaimChannelEnd, ClaimChannelEndReply, ClaimChannelEndResult,
12 ClearBusListenerFilters, CloseChannelEnd, CloseChannelEndReply, CloseChannelEndResult,
13 Connect2, ConnectData, ConnectResult, CreateBusListener, CreateBusListenerReply, CreateChannel,
14 CreateChannelReply, CreateObject, CreateObjectReply, CreateObjectResult, CreateService,
15 CreateService2, CreateServiceReply, CreateServiceResult, DestroyBusListener,
16 DestroyBusListenerReply, DestroyBusListenerResult, DestroyObject, DestroyObjectReply,
17 DestroyObjectResult, DestroyService, DestroyServiceReply, DestroyServiceResult, EmitBusEvent,
18 EmitEvent, ItemReceived, Message, QueryIntrospection, QueryIntrospectionReply,
19 QueryIntrospectionResult, QueryServiceInfo, QueryServiceInfoReply, QueryServiceInfoResult,
20 QueryServiceVersion, QueryServiceVersionReply, QueryServiceVersionResult,
21 RemoveBusListenerFilter, SendItem, ServiceDestroyed, Shutdown, StartBusListener,
22 StartBusListenerReply, StartBusListenerResult, StopBusListener, StopBusListenerReply,
23 StopBusListenerResult, SubscribeAllEvents, SubscribeAllEventsReply, SubscribeAllEventsResult,
24 SubscribeEvent, SubscribeEventReply, SubscribeEventResult, SubscribeService,
25 SubscribeServiceReply, SubscribeServiceResult, Sync, SyncReply, UnsubscribeAllEvents,
26 UnsubscribeAllEventsReply, UnsubscribeAllEventsResult, UnsubscribeEvent, UnsubscribeService,
27};
28use crate::core::transport::{AsyncTransport, AsyncTransportExt};
29#[cfg(feature = "introspection")]
30use crate::core::TypeId;
31use crate::core::{
32 BusListenerCookie, ChannelCookie, ChannelEnd, ChannelEndWithCapacity, Deserialize, ObjectId,
33 ProtocolVersion, Serialize, SerializedValue, SerializedValueSlice, ServiceCookie, ServiceId,
34 ServiceInfo,
35};
36use crate::error::{ConnectError, RunError};
37use crate::function_call_map::FunctionCallMap;
38#[cfg(feature = "introspection")]
39use crate::handle::request::QueryIntrospectionRequest;
40use crate::handle::request::{
41 CallFunctionReplyRequest, CallFunctionRequest, ClaimReceiverRequest, ClaimSenderRequest,
42 CloseChannelEndRequest, CreateBusListenerRequest, CreateClaimedReceiverRequest,
43 CreateClaimedSenderRequest, CreateLifetimeListenerRequest, CreateObjectRequest,
44 CreateProxyRequest, CreateServiceRequest, DestroyBusListenerRequest, DestroyObjectRequest,
45 DestroyServiceRequest, EmitEventRequest, HandleRequest, SendItemRequest,
46 StartBusListenerRequest, StopBusListenerRequest, SubscribeAllEventsRequest,
47 SubscribeEventRequest, SyncBrokerRequest, SyncClientRequest, UnsubscribeAllEventsRequest,
48 UnsubscribeEventRequest,
49};
50use crate::lifetime::LifetimeListener;
51use crate::low_level::{
52 PendingReceiver, PendingSender, ProxyId, RawCall, Service, UnclaimedReceiver, UnclaimedSender,
53};
54use crate::serial_map::SerialMap;
55use crate::{Error, Handle, Object};
56use broker_subscriptions::BrokerSubscriptions;
57use futures_channel::{mpsc, oneshot};
58use proxies::{Proxies, SubscribeResult};
59use select::{Select, Selected};
60use std::collections::HashMap;
61use std::mem;
62use std::time::Instant;
63
64const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::V1_19;
65
66#[derive(Debug)]
82#[must_use = "clients do nothing unless you `.await` or poll `Client::run()`"]
83pub struct Client<T>
84where
85 T: AsyncTransport + Unpin,
86{
87 select: Select,
88 t: T,
89 protocol_version: ProtocolVersion,
90 recv: mpsc::UnboundedReceiver<HandleRequest>,
91 handle: Handle,
92 num_handles: usize,
93 create_object: SerialMap<CreateObjectRequest>,
94 destroy_object: SerialMap<oneshot::Sender<DestroyObjectResult>>,
95 create_service: SerialMap<CreateServiceRequest>,
96 destroy_service: SerialMap<DestroyServiceRequest>,
97 function_calls: FunctionCallMap,
98 services: HashMap<ServiceCookie, mpsc::UnboundedSender<RawCall>>,
99 broker_subscriptions: BrokerSubscriptions,
100 create_channel: SerialMap<CreateChannelData>,
101 close_channel_end: SerialMap<CloseChannelEndRequest>,
102 claim_channel_end: SerialMap<ClaimChannelEndData>,
103 senders: HashMap<ChannelCookie, SenderState>,
104 receivers: HashMap<ChannelCookie, ReceiverState>,
105 sync: SerialMap<SyncBrokerRequest>,
106 create_bus_listener: SerialMap<CreateBusListenerData>,
107 destroy_bus_listener: SerialMap<DestroyBusListenerRequest>,
108 start_bus_listener: SerialMap<StartBusListenerRequest>,
109 stop_bus_listener: SerialMap<StopBusListenerRequest>,
110 bus_listeners: HashMap<BusListenerCookie, BusListenerHandle>,
111 abort_call_handles: HashMap<u32, oneshot::Sender<()>>,
112 query_service_info: SerialMap<CreateProxyRequest>,
113 query_service_version: SerialMap<CreateProxyRequest>,
114 subscribe_event: SerialMap<SubscribeEventRequest>,
115 subscribe_service: SerialMap<ServiceCookie>,
116 subscribe_all_events: SerialMap<SubscribeAllEventsRequest>,
117 unsubscribe_all_events: SerialMap<UnsubscribeAllEventsRequest>,
118 proxies: Proxies,
119 #[cfg(feature = "introspection")]
120 introspection: HashMap<TypeId, SerializedValue>,
121 #[cfg(feature = "introspection")]
122 query_introspection: SerialMap<QueryIntrospectionRequest>,
123}
124
125impl<T> Client<T>
126where
127 T: AsyncTransport + Unpin,
128{
129 pub async fn connect(t: T) -> Result<Self, ConnectError<T::Error>> {
169 let (client, _) = Self::connect_with_data::<()>(t, None).await?;
170 Ok(client)
171 }
172
173 pub async fn connect_with_data<D: Serialize + ?Sized>(
178 mut t: T,
179 data: Option<&D>,
180 ) -> Result<(Self, Option<SerializedValue>), ConnectError<T::Error>> {
181 let mut connect_data = ConnectData::new();
182
183 if let Some(data) = data {
184 connect_data.serialize_user(data)?;
185 }
186
187 let connect = Connect2::with_serialize_data(
188 PROTOCOL_VERSION.major(),
189 PROTOCOL_VERSION.minor(),
190 &connect_data,
191 )?;
192
193 t.send_and_flush(connect)
194 .await
195 .map_err(ConnectError::Transport)?;
196
197 let connect_reply = match t.receive().await.map_err(ConnectError::Transport)? {
198 Message::ConnectReply2(connect_reply) => connect_reply,
199 msg => return Err(ConnectError::UnexpectedMessageReceived(msg)),
200 };
201
202 let connect_reply_data = connect_reply.deserialize_connect_data()?;
203
204 let minor_version = match connect_reply.result {
205 ConnectResult::Ok(minor_version) => minor_version,
206 ConnectResult::Rejected => return Err(ConnectError::Rejected(connect_reply_data.user)),
207 ConnectResult::IncompatibleVersion => return Err(ConnectError::IncompatibleVersion),
208 };
209
210 let protocol_version = ProtocolVersion::new(PROTOCOL_VERSION.major(), minor_version)
211 .map_err(|_| ConnectError::IncompatibleVersion)?;
212
213 if protocol_version > PROTOCOL_VERSION {
214 return Err(ConnectError::IncompatibleVersion);
215 }
216
217 let (send, recv) = mpsc::unbounded();
218 let client = Self {
219 select: Select::new(),
220 t,
221 protocol_version,
222 recv,
223 handle: Handle::new(send),
224 num_handles: 1,
225 create_object: SerialMap::new(),
226 destroy_object: SerialMap::new(),
227 create_service: SerialMap::new(),
228 destroy_service: SerialMap::new(),
229 function_calls: FunctionCallMap::new(),
230 services: HashMap::new(),
231 broker_subscriptions: BrokerSubscriptions::new(),
232 create_channel: SerialMap::new(),
233 close_channel_end: SerialMap::new(),
234 claim_channel_end: SerialMap::new(),
235 senders: HashMap::new(),
236 receivers: HashMap::new(),
237 sync: SerialMap::new(),
238 create_bus_listener: SerialMap::new(),
239 destroy_bus_listener: SerialMap::new(),
240 start_bus_listener: SerialMap::new(),
241 stop_bus_listener: SerialMap::new(),
242 bus_listeners: HashMap::new(),
243 abort_call_handles: HashMap::new(),
244 query_service_info: SerialMap::new(),
245 query_service_version: SerialMap::new(),
246 subscribe_event: SerialMap::new(),
247 subscribe_service: SerialMap::new(),
248 subscribe_all_events: SerialMap::new(),
249 unsubscribe_all_events: SerialMap::new(),
250 proxies: Proxies::new(),
251 #[cfg(feature = "introspection")]
252 introspection: HashMap::new(),
253 #[cfg(feature = "introspection")]
254 query_introspection: SerialMap::new(),
255 };
256
257 Ok((client, connect_reply_data.user))
258 }
259
260 pub async fn connect_with_data_and_deserialize<D1, D2>(
287 t: T,
288 data: Option<&D1>,
289 ) -> Result<(Self, Option<D2>), ConnectError<T::Error>>
290 where
291 D1: Serialize + ?Sized,
292 D2: Deserialize,
293 {
294 let (client, data) = Self::connect_with_data(t, data).await?;
295 let data = data
296 .as_deref()
297 .map(SerializedValueSlice::deserialize)
298 .transpose()?;
299
300 Ok((client, data))
301 }
302
303 pub fn handle(&self) -> &Handle {
310 &self.handle
311 }
312
313 pub fn protocol_version(self) -> ProtocolVersion {
315 self.protocol_version
316 }
317
318 pub async fn run(mut self) -> Result<(), RunError<T::Error>> {
334 loop {
335 match self.select().await {
336 Selected::Transport(Ok(Message::Shutdown(Shutdown))) => {
337 self.t.send_and_flush(Shutdown).await?;
338 return Ok(());
339 }
340
341 Selected::Transport(Ok(msg)) => self.handle_message(msg).await?,
342 Selected::Transport(Err(e)) => return Err(e.into()),
343 Selected::Handle(HandleRequest::Shutdown) => break,
344 Selected::Handle(req) => self.handle_request(req).await?,
345 Selected::AbortFunctionCall(serial) => self.abort_function_call(serial).await?,
346 }
347
348 if self.num_handles == 1 {
349 break;
350 }
351 }
352
353 self.t.send_and_flush(Shutdown).await?;
354 self.drain_transport().await?;
355 Ok(())
356 }
357
358 async fn select(&mut self) -> Selected<T> {
359 self.select
360 .select(&mut self.t, &mut self.recv, &mut self.function_calls)
361 .await
362 }
363
364 async fn drain_transport(&mut self) -> Result<(), RunError<T::Error>> {
365 loop {
366 if let Message::Shutdown(Shutdown) = self.t.receive().await? {
367 return Ok(());
368 }
369 }
370 }
371
372 async fn handle_message(&mut self, msg: Message) -> Result<(), RunError<T::Error>> {
373 match msg {
374 Message::CreateObjectReply(msg) => self.msg_create_object_reply(msg)?,
375 Message::DestroyObjectReply(msg) => self.msg_destroy_object_reply(msg),
376 Message::CreateServiceReply(msg) => self.msg_create_service_reply(msg)?,
377 Message::DestroyServiceReply(msg) => self.msg_destroy_service_reply(msg),
378 Message::CallFunction(msg) => self.msg_call_function(msg).await?,
379 Message::CallFunction2(msg) => self.msg_call_function2(msg).await?,
380 Message::CallFunctionReply(msg) => self.msg_call_function_reply(msg),
381 Message::SubscribeEvent(msg) => self.msg_subscribe_event(msg),
382 Message::UnsubscribeEvent(msg) => self.msg_unsubscribe_event(msg),
383 Message::CreateChannelReply(msg) => self.msg_create_channel_reply(msg)?,
384 Message::CloseChannelEndReply(msg) => self.msg_close_channel_end_reply(msg)?,
385 Message::ChannelEndClosed(msg) => self.msg_channel_end_closed(msg)?,
386 Message::ClaimChannelEndReply(msg) => self.msg_claim_channel_end_reply(msg)?,
387 Message::ChannelEndClaimed(msg) => self.msg_channel_end_claimed(msg)?,
388 Message::ItemReceived(msg) => self.msg_item_received(msg)?,
389 Message::AddChannelCapacity(msg) => self.msg_add_channel_capacity(msg)?,
390 Message::SyncReply(msg) => self.msg_sync_reply(msg)?,
391 Message::CreateBusListenerReply(msg) => self.msg_create_bus_listener_reply(msg)?,
392 Message::DestroyBusListenerReply(msg) => self.msg_destroy_bus_listener_reply(msg)?,
393 Message::StartBusListenerReply(msg) => self.msg_start_bus_listener_reply(msg)?,
394 Message::StopBusListenerReply(msg) => self.msg_stop_bus_listener_reply(msg)?,
395 Message::EmitBusEvent(msg) => self.msg_emit_bus_event(msg)?,
396
397 Message::BusListenerCurrentFinished(msg) => {
398 self.msg_bus_listener_current_finished(msg)?
399 }
400
401 Message::AbortFunctionCall(msg) => self.msg_abort_function_call(msg)?,
402 Message::QueryIntrospection(msg) => self.msg_query_introspection(msg).await?,
403 Message::QueryIntrospectionReply(msg) => self.msg_query_introspection_reply(msg)?,
404 Message::QueryServiceInfoReply(msg) => self.msg_query_service_info_reply(msg).await?,
405
406 Message::QueryServiceVersionReply(msg) => {
407 self.msg_query_service_version_reply(msg).await?
408 }
409
410 Message::SubscribeEventReply(msg) => self.msg_subscribe_event_reply(msg)?,
411 Message::EmitEvent(msg) => self.msg_emit_event(msg),
412 Message::ServiceDestroyed(msg) => self.msg_service_destroyed(msg),
413 Message::SubscribeServiceReply(msg) => self.msg_subscribe_service_reply(msg)?,
414 Message::SubscribeAllEvents(msg) => self.msg_subscribe_all_events(msg)?,
415 Message::SubscribeAllEventsReply(msg) => self.msg_subscribe_all_events_reply(msg)?,
416 Message::UnsubscribeAllEvents(msg) => self.msg_unsubscribe_all_events(msg)?,
417
418 Message::UnsubscribeAllEventsReply(msg) => {
419 self.msg_unsubscribe_all_events_reply(msg)?
420 }
421
422 Message::Connect(_)
423 | Message::ConnectReply(_)
424 | Message::CreateObject(_)
425 | Message::DestroyObject(_)
426 | Message::CreateService(_)
427 | Message::DestroyService(_)
428 | Message::QueryServiceVersion(_)
429 | Message::CreateChannel(_)
430 | Message::CloseChannelEnd(_)
431 | Message::ClaimChannelEnd(_)
432 | Message::SendItem(_)
433 | Message::Sync(_)
434 | Message::CreateBusListener(_)
435 | Message::DestroyBusListener(_)
436 | Message::AddBusListenerFilter(_)
437 | Message::RemoveBusListenerFilter(_)
438 | Message::ClearBusListenerFilters(_)
439 | Message::StartBusListener(_)
440 | Message::StopBusListener(_)
441 | Message::Connect2(_)
442 | Message::ConnectReply2(_)
443 | Message::RegisterIntrospection(_)
444 | Message::CreateService2(_)
445 | Message::QueryServiceInfo(_)
446 | Message::SubscribeService(_)
447 | Message::UnsubscribeService(_) => {
448 return Err(RunError::UnexpectedMessageReceived(msg))
449 }
450
451 Message::Shutdown(Shutdown) => unreachable!(), }
453
454 Ok(())
455 }
456
457 fn msg_create_object_reply(
458 &mut self,
459 msg: CreateObjectReply,
460 ) -> Result<(), RunError<T::Error>> {
461 let Some(req) = self.create_object.remove(msg.serial) else {
462 return Err(RunError::UnexpectedMessageReceived(msg.into()));
463 };
464
465 let reply = match msg.result {
466 CreateObjectResult::Ok(cookie) => Ok(Object::new_impl(
467 ObjectId::new(req.uuid, cookie),
468 self.handle.clone(),
469 )),
470
471 CreateObjectResult::DuplicateObject => Err(Error::DuplicateObject),
472 };
473
474 let _ = req.reply.send(reply);
475 Ok(())
476 }
477
478 fn msg_destroy_object_reply(&mut self, msg: DestroyObjectReply) {
479 if let Some(send) = self.destroy_object.remove(msg.serial) {
480 let _ = send.send(msg.result);
481 }
482 }
483
484 fn msg_create_service_reply(
485 &mut self,
486 msg: CreateServiceReply,
487 ) -> Result<(), RunError<T::Error>> {
488 let Some(req) = self.create_service.remove(msg.serial) else {
489 return Err(RunError::UnexpectedMessageReceived(msg.into()));
490 };
491
492 let reply = match msg.result {
493 CreateServiceResult::Ok(cookie) => {
494 let (send, function_calls) = mpsc::unbounded();
495 let dup = self.services.insert(cookie, send);
496 debug_assert!(dup.is_none());
497
498 Ok(Service::new_impl(
499 ServiceId::new(req.object_id, req.service_uuid, cookie),
500 req.info,
501 self.handle.clone(),
502 function_calls,
503 ))
504 }
505
506 CreateServiceResult::DuplicateService => Err(Error::DuplicateService),
507 CreateServiceResult::InvalidObject => Err(Error::InvalidObject),
508 CreateServiceResult::ForeignObject => unreachable!(),
509 };
510
511 let _ = req.reply.send(reply);
512 Ok(())
513 }
514
515 fn msg_destroy_service_reply(&mut self, msg: DestroyServiceReply) {
516 let Some(req) = self.destroy_service.remove(msg.serial) else {
517 return;
518 };
519
520 let reply = match msg.result {
521 DestroyServiceResult::Ok => {
522 let contained = self.services.remove(&req.id.cookie);
523 debug_assert!(contained.is_some());
524 self.broker_subscriptions.remove_service(req.id.cookie);
525 Ok(())
526 }
527
528 DestroyServiceResult::InvalidService => Err(Error::InvalidService),
529 DestroyServiceResult::ForeignObject => unreachable!(),
530 };
531
532 let _ = req.reply.send(reply);
533 }
534
535 async fn msg_call_function(&mut self, msg: CallFunction) -> Result<(), RunError<T::Error>> {
536 let msg = CallFunction2 {
537 serial: msg.serial,
538 service_cookie: msg.service_cookie,
539 function: msg.function,
540 version: None,
541 value: msg.value,
542 };
543
544 self.msg_call_function2(msg).await
545 }
546
547 async fn msg_call_function2(&mut self, msg: CallFunction2) -> Result<(), RunError<T::Error>> {
548 let send = self
549 .services
550 .get_mut(&msg.service_cookie)
551 .expect("inconsistent state");
552
553 let (abort_send, abort_recv) = oneshot::channel();
554
555 let req = RawCall {
556 serial: msg.serial,
557 function: msg.function,
558 version: msg.version,
559 timestamp: Instant::now(),
560 args: msg.value,
561 aborted: abort_recv,
562 };
563
564 if send.unbounded_send(req).is_ok() {
565 let dup = self.abort_call_handles.insert(msg.serial, abort_send);
566 assert!(dup.is_none());
567 } else {
568 self.t
569 .send_and_flush(CallFunctionReply {
570 serial: msg.serial,
571 result: CallFunctionResult::InvalidService,
572 })
573 .await?;
574 }
575
576 Ok(())
577 }
578
579 fn msg_call_function_reply(&mut self, msg: CallFunctionReply) {
580 if let Some(send) = self.function_calls.remove(msg.serial) {
581 let _ = send.send(Ok((msg.result, Instant::now())));
582 }
583 }
584
585 fn msg_subscribe_event(&mut self, msg: SubscribeEvent) {
586 self.broker_subscriptions
587 .subscribe(msg.service_cookie, msg.event);
588 }
589
590 fn msg_unsubscribe_event(&mut self, msg: UnsubscribeEvent) {
591 self.broker_subscriptions
592 .unsubscribe(msg.service_cookie, msg.event);
593 }
594
595 fn msg_create_channel_reply(
596 &mut self,
597 msg: CreateChannelReply,
598 ) -> Result<(), RunError<T::Error>> {
599 match self.create_channel.remove(msg.serial) {
600 Some(CreateChannelData::Sender(reply)) => {
601 let (send, recv) = oneshot::channel();
602 let sender = PendingSender::new(self.handle.clone(), msg.cookie, recv);
603 let receiver = UnclaimedReceiver::new(self.handle.clone(), msg.cookie);
604 let dup = self.senders.insert(msg.cookie, SenderState::Pending(send));
605 debug_assert!(dup.is_none());
606 let _ = reply.send((sender, receiver));
607 Ok(())
608 }
609
610 Some(CreateChannelData::Receiver(req)) => {
611 let (send, recv) = oneshot::channel();
612 let sender = UnclaimedSender::new(self.handle.clone(), msg.cookie);
613 let receiver =
614 PendingReceiver::new(self.handle.clone(), msg.cookie, recv, req.capacity);
615 let dup = self
616 .receivers
617 .insert(msg.cookie, ReceiverState::Pending(send));
618 debug_assert!(dup.is_none());
619 let _ = req.reply.send((sender, receiver));
620 Ok(())
621 }
622
623 None => Err(RunError::UnexpectedMessageReceived(msg.into())),
624 }
625 }
626
627 fn msg_close_channel_end_reply(
628 &mut self,
629 msg: CloseChannelEndReply,
630 ) -> Result<(), RunError<T::Error>> {
631 let Some(req) = self.close_channel_end.remove(msg.serial) else {
632 return Err(RunError::UnexpectedMessageReceived(msg.into()));
633 };
634
635 if req.claimed {
636 match req.end {
637 ChannelEnd::Sender => {
638 let contained = self.senders.remove(&req.cookie);
639 debug_assert!(contained.is_some());
640 }
641
642 ChannelEnd::Receiver => {
643 let contained = self.receivers.remove(&req.cookie);
644 debug_assert!(contained.is_some());
645 }
646 }
647 }
648
649 let res = match msg.result {
650 CloseChannelEndResult::Ok => Ok(()),
651
652 CloseChannelEndResult::InvalidChannel | CloseChannelEndResult::ForeignChannel => {
653 Err(Error::InvalidChannel)
654 }
655 };
656
657 let _ = req.reply.send(res);
658 Ok(())
659 }
660
661 fn msg_channel_end_closed(&mut self, msg: ChannelEndClosed) -> Result<(), RunError<T::Error>> {
662 match msg.end {
663 ChannelEnd::Sender => {
664 let receiver = self
665 .receivers
666 .get_mut(&msg.cookie)
667 .map(|receiver| mem::replace(receiver, ReceiverState::SenderClosed));
668
669 match receiver {
670 Some(ReceiverState::Pending(send)) => {
671 let _ = send.send(Err(Error::InvalidChannel));
672 Ok(())
673 }
674
675 Some(ReceiverState::Established(_)) => Ok(()),
676
677 Some(ReceiverState::SenderClosed) | None => {
678 Err(RunError::UnexpectedMessageReceived(msg.into()))
679 }
680 }
681 }
682
683 ChannelEnd::Receiver => {
684 let sender = self
685 .senders
686 .get_mut(&msg.cookie)
687 .map(|sender| mem::replace(sender, SenderState::ReceiverClosed));
688
689 match sender {
690 Some(SenderState::Pending(send)) => {
691 let _ = send.send(Err(Error::InvalidChannel));
692 Ok(())
693 }
694
695 Some(SenderState::Established(_)) => Ok(()),
696
697 Some(SenderState::ReceiverClosed) | None => {
698 Err(RunError::UnexpectedMessageReceived(msg.into()))
699 }
700 }
701 }
702 }
703 }
704
705 fn msg_claim_channel_end_reply(
706 &mut self,
707 msg: ClaimChannelEndReply,
708 ) -> Result<(), RunError<T::Error>> {
709 let Some(req) = self.claim_channel_end.remove(msg.serial) else {
710 return Err(RunError::UnexpectedMessageReceived(msg.into()));
711 };
712
713 match req {
714 ClaimChannelEndData::Sender(req) => match msg.result {
715 ClaimChannelEndResult::SenderClaimed(capacity) => {
716 let (send, recv) = mpsc::unbounded();
717 let dup = self
718 .senders
719 .insert(req.cookie, SenderState::Established(send));
720 debug_assert!(dup.is_none());
721 let _ = req.reply.send(Ok((recv, capacity)));
722 }
723
724 ClaimChannelEndResult::ReceiverClaimed => {
725 return Err(RunError::UnexpectedMessageReceived(msg.into()))
726 }
727
728 ClaimChannelEndResult::InvalidChannel | ClaimChannelEndResult::AlreadyClaimed => {
729 let _ = req.reply.send(Err(Error::InvalidChannel));
730 }
731 },
732
733 ClaimChannelEndData::Receiver(req) => match msg.result {
734 ClaimChannelEndResult::SenderClaimed(_) => {
735 return Err(RunError::UnexpectedMessageReceived(msg.into()))
736 }
737
738 ClaimChannelEndResult::ReceiverClaimed => {
739 let (send, recv) = mpsc::unbounded();
740 let dup = self
741 .receivers
742 .insert(req.cookie, ReceiverState::Established(send));
743 debug_assert!(dup.is_none());
744 let _ = req.reply.send(Ok((recv, req.capacity)));
745 }
746
747 ClaimChannelEndResult::InvalidChannel | ClaimChannelEndResult::AlreadyClaimed => {
748 let _ = req.reply.send(Err(Error::InvalidChannel));
749 }
750 },
751 }
752
753 Ok(())
754 }
755
756 fn msg_channel_end_claimed(
757 &mut self,
758 msg: ChannelEndClaimed,
759 ) -> Result<(), RunError<T::Error>> {
760 match msg.end {
761 ChannelEndWithCapacity::Sender => {
762 let Some(receiver) = self.receivers.get_mut(&msg.cookie) else {
763 return Err(RunError::UnexpectedMessageReceived(msg.into()));
764 };
765
766 let (send, recv) = mpsc::unbounded();
767
768 match mem::replace(receiver, ReceiverState::Established(send)) {
769 ReceiverState::Pending(send) => {
770 let _ = send.send(Ok(recv));
771 Ok(())
772 }
773
774 ReceiverState::Established(_) | ReceiverState::SenderClosed => {
775 Err(RunError::UnexpectedMessageReceived(msg.into()))
776 }
777 }
778 }
779
780 ChannelEndWithCapacity::Receiver(capacity) => {
781 let Some(sender) = self.senders.get_mut(&msg.cookie) else {
782 return Err(RunError::UnexpectedMessageReceived(msg.into()));
783 };
784
785 let (send, recv) = mpsc::unbounded();
786
787 match mem::replace(sender, SenderState::Established(send)) {
788 SenderState::Pending(send) => {
789 let _ = send.send(Ok((recv, capacity)));
790 Ok(())
791 }
792
793 SenderState::Established(_) | SenderState::ReceiverClosed => {
794 Err(RunError::UnexpectedMessageReceived(msg.into()))
795 }
796 }
797 }
798 }
799 }
800
801 fn msg_item_received(&self, msg: ItemReceived) -> Result<(), RunError<T::Error>> {
802 if let Some(ReceiverState::Established(send)) = self.receivers.get(&msg.cookie) {
803 let _ = send.unbounded_send(msg.value);
804 Ok(())
805 } else {
806 Err(RunError::UnexpectedMessageReceived(msg.into()))
807 }
808 }
809
810 fn msg_add_channel_capacity(&self, msg: AddChannelCapacity) -> Result<(), RunError<T::Error>> {
811 if let Some(SenderState::Established(send)) = self.senders.get(&msg.cookie) {
812 let _ = send.unbounded_send(msg.capacity);
813 Ok(())
814 } else {
815 Err(RunError::UnexpectedMessageReceived(msg.into()))
816 }
817 }
818
819 fn msg_sync_reply(&mut self, msg: SyncReply) -> Result<(), RunError<T::Error>> {
820 if let Some(req) = self.sync.remove(msg.serial) {
821 let _ = req.send(Instant::now());
822 Ok(())
823 } else {
824 Err(RunError::UnexpectedMessageReceived(msg.into()))
825 }
826 }
827
828 fn msg_create_bus_listener_reply(
829 &mut self,
830 msg: CreateBusListenerReply,
831 ) -> Result<(), RunError<T::Error>> {
832 let Some(data) = self.create_bus_listener.remove(msg.serial) else {
833 return Err(RunError::UnexpectedMessageReceived(msg.into()));
834 };
835
836 let (send, recv) = mpsc::unbounded();
837
838 match data {
839 CreateBusListenerData::BusListener(reply) => {
840 let listener = BusListener::new_impl(msg.cookie, self.handle.clone(), recv);
841 let _ = reply.send(listener);
842 }
843
844 CreateBusListenerData::LifetimeListener(reply) => {
845 let listener = LifetimeListener::new(msg.cookie, self.handle.clone(), recv);
846 let _ = reply.send(listener);
847 }
848 }
849
850 let bus_listener_handle = BusListenerHandle::new(send);
851 let dup = self.bus_listeners.insert(msg.cookie, bus_listener_handle);
852 assert!(dup.is_none());
853
854 Ok(())
855 }
856
857 fn msg_destroy_bus_listener_reply(
858 &mut self,
859 msg: DestroyBusListenerReply,
860 ) -> Result<(), RunError<T::Error>> {
861 let Some(req) = self.destroy_bus_listener.remove(msg.serial) else {
862 return Err(RunError::UnexpectedMessageReceived(msg.into()));
863 };
864
865 if msg.result == DestroyBusListenerResult::Ok {
866 let contained = self.bus_listeners.remove(&req.cookie);
867 debug_assert!(contained.is_some());
868 }
869
870 let _ = req.reply.send(msg.result);
871
872 Ok(())
873 }
874
875 fn msg_start_bus_listener_reply(
876 &mut self,
877 msg: StartBusListenerReply,
878 ) -> Result<(), RunError<T::Error>> {
879 let Some(req) = self.start_bus_listener.remove(msg.serial) else {
880 return Err(RunError::UnexpectedMessageReceived(msg.into()));
881 };
882
883 if msg.result == StartBusListenerResult::Ok {
884 let Some(bus_listener) = self.bus_listeners.get_mut(&req.cookie) else {
885 return Err(RunError::UnexpectedMessageReceived(msg.into()));
886 };
887
888 if bus_listener.start(req.scope) {
889 let _ = req.reply.send(msg.result);
890 Ok(())
891 } else {
892 Err(RunError::UnexpectedMessageReceived(msg.into()))
893 }
894 } else {
895 let _ = req.reply.send(msg.result);
896 Ok(())
897 }
898 }
899
900 fn msg_stop_bus_listener_reply(
901 &mut self,
902 msg: StopBusListenerReply,
903 ) -> Result<(), RunError<T::Error>> {
904 let Some(req) = self.stop_bus_listener.remove(msg.serial) else {
905 return Err(RunError::UnexpectedMessageReceived(msg.into()));
906 };
907
908 if msg.result == StopBusListenerResult::Ok {
909 let Some(bus_listener) = self.bus_listeners.get_mut(&req.cookie) else {
910 return Err(RunError::UnexpectedMessageReceived(msg.into()));
911 };
912
913 if bus_listener.stop() {
914 let _ = req.reply.send(msg.result);
915 Ok(())
916 } else {
917 Err(RunError::UnexpectedMessageReceived(msg.into()))
918 }
919 } else {
920 let _ = req.reply.send(msg.result);
921 Ok(())
922 }
923 }
924
925 fn msg_emit_bus_event(&self, msg: EmitBusEvent) -> Result<(), RunError<T::Error>> {
926 if let Some(cookie) = msg.cookie {
927 let Some(bus_listener) = self.bus_listeners.get(&cookie) else {
928 return Err(RunError::UnexpectedMessageReceived(msg.into()));
929 };
930
931 if bus_listener.emit_current(msg.event) {
932 Ok(())
933 } else {
934 Err(RunError::UnexpectedMessageReceived(msg.into()))
935 }
936 } else {
937 for bus_listener in self.bus_listeners.values() {
938 bus_listener.emit_new_if_matches(msg.event);
939 }
940
941 Ok(())
942 }
943 }
944
945 fn msg_bus_listener_current_finished(
946 &mut self,
947 msg: BusListenerCurrentFinished,
948 ) -> Result<(), RunError<T::Error>> {
949 if let Some(bus_listener) = self.bus_listeners.get_mut(&msg.cookie) {
950 if bus_listener.current_finished() {
951 Ok(())
952 } else {
953 Err(RunError::UnexpectedMessageReceived(msg.into()))
954 }
955 } else {
956 Err(RunError::UnexpectedMessageReceived(msg.into()))
957 }
958 }
959
960 fn msg_abort_function_call(
961 &mut self,
962 msg: AbortFunctionCall,
963 ) -> Result<(), RunError<T::Error>> {
964 if self.protocol_version >= ProtocolVersion::V1_16 {
965 self.abort_call_handles.remove(&msg.serial);
966 Ok(())
967 } else {
968 Err(RunError::UnexpectedMessageReceived(msg.into()))
969 }
970 }
971
972 #[cfg(feature = "introspection")]
973 async fn msg_query_introspection(
974 &mut self,
975 msg: QueryIntrospection,
976 ) -> Result<(), RunError<T::Error>> {
977 if self.protocol_version >= ProtocolVersion::V1_17 {
978 let result = if let Some(introspection) = self.introspection.get(&msg.type_id) {
979 QueryIntrospectionResult::Ok(introspection.clone())
980 } else {
981 QueryIntrospectionResult::Unavailable
982 };
983
984 self.t
985 .send_and_flush(QueryIntrospectionReply {
986 serial: msg.serial,
987 result,
988 })
989 .await
990 .map_err(Into::into)
991 } else {
992 Err(RunError::UnexpectedMessageReceived(msg.into()))
993 }
994 }
995
996 #[cfg(not(feature = "introspection"))]
997 async fn msg_query_introspection(
998 &mut self,
999 msg: QueryIntrospection,
1000 ) -> Result<(), RunError<T::Error>> {
1001 self.t
1002 .send_and_flush(QueryIntrospectionReply {
1003 serial: msg.serial,
1004 result: QueryIntrospectionResult::Unavailable,
1005 })
1006 .await
1007 .map_err(Into::into)
1008 }
1009
1010 #[cfg(feature = "introspection")]
1011 fn msg_query_introspection_reply(
1012 &mut self,
1013 msg: QueryIntrospectionReply,
1014 ) -> Result<(), RunError<T::Error>> {
1015 if self.protocol_version < ProtocolVersion::V1_17 {
1016 return Err(RunError::UnexpectedMessageReceived(msg.into()));
1017 }
1018
1019 let Some(req) = self.query_introspection.remove(msg.serial) else {
1020 return Err(RunError::UnexpectedMessageReceived(msg.into()));
1021 };
1022
1023 match msg.result {
1024 QueryIntrospectionResult::Ok(introspection) => {
1025 let _ = req.reply.send(Some(introspection));
1026 }
1027
1028 QueryIntrospectionResult::Unavailable => {
1029 let _ = req.reply.send(None);
1030 }
1031 }
1032
1033 Ok(())
1034 }
1035
1036 #[cfg(not(feature = "introspection"))]
1037 fn msg_query_introspection_reply(
1038 &mut self,
1039 msg: QueryIntrospectionReply,
1040 ) -> Result<(), RunError<T::Error>> {
1041 Err(RunError::UnexpectedMessageReceived(msg.into()))
1042 }
1043
1044 async fn msg_query_service_info_reply(
1045 &mut self,
1046 msg: QueryServiceInfoReply,
1047 ) -> Result<(), RunError<T::Error>> {
1048 let Some(req) = self.query_service_info.remove(msg.serial) else {
1049 return Err(RunError::UnexpectedMessageReceived(msg.into()));
1050 };
1051
1052 debug_assert!(self.protocol_version >= ProtocolVersion::V1_17);
1053
1054 let info = match msg.result {
1055 QueryServiceInfoResult::Ok(info) => {
1056 info.deserialize().map_err(RunError::Deserialize).map(Ok)?
1057 }
1058
1059 QueryServiceInfoResult::InvalidService => Err(Error::InvalidService),
1060 };
1061
1062 self.finish_create_proxy(req, info).await?;
1063 Ok(())
1064 }
1065
1066 async fn msg_query_service_version_reply(
1067 &mut self,
1068 msg: QueryServiceVersionReply,
1069 ) -> Result<(), RunError<T::Error>> {
1070 let Some(req) = self.query_service_version.remove(msg.serial) else {
1071 return Err(RunError::UnexpectedMessageReceived(msg.into()));
1072 };
1073
1074 debug_assert!(self.protocol_version < ProtocolVersion::V1_17);
1076
1077 let info = match msg.result {
1078 QueryServiceVersionResult::Ok(version) => Ok(ServiceInfo::new(version)),
1079 QueryServiceVersionResult::InvalidService => Err(Error::InvalidService),
1080 };
1081
1082 self.finish_create_proxy(req, info).await?;
1083 Ok(())
1084 }
1085
1086 async fn finish_create_proxy(
1087 &mut self,
1088 req: CreateProxyRequest,
1089 info: Result<ServiceInfo, Error>,
1090 ) -> Result<(), RunError<T::Error>> {
1091 let info = match info {
1092 Ok(info) => info,
1093
1094 Err(e) => {
1095 let _ = req.reply.send(Err(e));
1096 return Ok(());
1097 }
1098 };
1099
1100 let (proxy, subscribe_service) =
1101 self.proxies.create(self.handle.clone(), req.service, info);
1102 let _ = req.reply.send(Ok(proxy));
1103
1104 if subscribe_service && (self.protocol_version >= ProtocolVersion::V1_18) {
1105 let serial = self.subscribe_service.insert(req.service.cookie);
1106
1107 self.t
1108 .send_and_flush(SubscribeService {
1109 serial,
1110 service_cookie: req.service.cookie,
1111 })
1112 .await?;
1113 }
1114
1115 Ok(())
1116 }
1117
1118 fn msg_subscribe_event_reply(
1119 &mut self,
1120 msg: SubscribeEventReply,
1121 ) -> Result<(), RunError<T::Error>> {
1122 if let Some(req) = self.subscribe_event.remove(msg.serial) {
1123 let res = match msg.result {
1124 SubscribeEventResult::Ok => Ok(()),
1125 SubscribeEventResult::InvalidService => Err(Error::InvalidService),
1126 };
1127
1128 let _ = req.reply.send(res);
1129 Ok(())
1130 } else {
1131 Err(RunError::UnexpectedMessageReceived(msg.into()))
1132 }
1133 }
1134
1135 fn msg_emit_event(&self, msg: EmitEvent) {
1136 self.proxies
1137 .emit(msg.service_cookie, msg.event, Instant::now(), msg.value);
1138 }
1139
1140 fn msg_service_destroyed(&mut self, msg: ServiceDestroyed) {
1141 self.proxies.remove_service(msg.service_cookie);
1142 }
1143
1144 fn msg_subscribe_service_reply(
1145 &mut self,
1146 msg: SubscribeServiceReply,
1147 ) -> Result<(), RunError<T::Error>> {
1148 if let Some(service) = self.subscribe_service.remove(msg.serial) {
1149 debug_assert!(self.protocol_version >= ProtocolVersion::V1_18);
1150
1151 if msg.result == SubscribeServiceResult::InvalidService {
1152 self.proxies.remove_service(service);
1153 }
1154
1155 Ok(())
1156 } else {
1157 Err(RunError::UnexpectedMessageReceived(msg.into()))
1158 }
1159 }
1160
1161 fn msg_subscribe_all_events(
1162 &mut self,
1163 msg: SubscribeAllEvents,
1164 ) -> Result<(), RunError<T::Error>> {
1165 if (self.protocol_version >= ProtocolVersion::V1_18) && msg.serial.is_none() {
1166 self.broker_subscriptions.subscribe_all(msg.service_cookie);
1167 Ok(())
1168 } else {
1169 Err(RunError::UnexpectedMessageReceived(msg.into()))
1170 }
1171 }
1172
1173 fn msg_subscribe_all_events_reply(
1174 &mut self,
1175 msg: SubscribeAllEventsReply,
1176 ) -> Result<(), RunError<T::Error>> {
1177 let Some(req) = self.subscribe_all_events.remove(msg.serial) else {
1178 return Err(RunError::UnexpectedMessageReceived(msg.into()));
1179 };
1180
1181 let res = match msg.result {
1182 SubscribeAllEventsResult::Ok => Ok(()),
1183 SubscribeAllEventsResult::InvalidService => Err(Error::InvalidService),
1184
1185 SubscribeAllEventsResult::NotSupported => {
1186 return Err(RunError::UnexpectedMessageReceived(msg.into()))
1187 }
1188 };
1189
1190 let _ = req.reply.send(res);
1191 Ok(())
1192 }
1193
1194 fn msg_unsubscribe_all_events(
1195 &mut self,
1196 msg: UnsubscribeAllEvents,
1197 ) -> Result<(), RunError<T::Error>> {
1198 if (self.protocol_version >= ProtocolVersion::V1_18) && msg.serial.is_none() {
1199 self.broker_subscriptions
1200 .unsubscribe_all(msg.service_cookie);
1201 Ok(())
1202 } else {
1203 Err(RunError::UnexpectedMessageReceived(msg.into()))
1204 }
1205 }
1206
1207 fn msg_unsubscribe_all_events_reply(
1208 &mut self,
1209 msg: UnsubscribeAllEventsReply,
1210 ) -> Result<(), RunError<T::Error>> {
1211 let Some(req) = self.unsubscribe_all_events.remove(msg.serial) else {
1212 return Err(RunError::UnexpectedMessageReceived(msg.into()));
1213 };
1214
1215 let res = match msg.result {
1216 UnsubscribeAllEventsResult::Ok => Ok(()),
1217 UnsubscribeAllEventsResult::InvalidService => Err(Error::InvalidService),
1218
1219 UnsubscribeAllEventsResult::NotSupported => {
1220 return Err(RunError::UnexpectedMessageReceived(msg.into()))
1221 }
1222 };
1223
1224 let _ = req.reply.send(res);
1225 Ok(())
1226 }
1227
1228 async fn handle_request(&mut self, req: HandleRequest) -> Result<(), RunError<T::Error>> {
1229 match req {
1230 HandleRequest::HandleCloned => self.req_handle_cloned(),
1231 HandleRequest::HandleDropped => self.req_handle_dropped(),
1232 HandleRequest::CreateObject(req) => self.req_create_object(req).await?,
1233 HandleRequest::DestroyObject(req) => self.req_destroy_object(req).await?,
1234 HandleRequest::CreateService(req) => self.req_create_service(req).await?,
1235 HandleRequest::DestroyService(req) => self.req_destroy_service(req).await?,
1236 HandleRequest::CallFunction(req) => self.req_call_function(req).await?,
1237 HandleRequest::CallFunctionReply(req) => self.req_call_function_reply(req).await?,
1238 HandleRequest::EmitEvent(req) => self.req_emit_event(req).await?,
1239 HandleRequest::CreateClaimedSender(req) => self.req_create_claimed_sender(req).await?,
1240 HandleRequest::CreateClaimedReceiver(req) => {
1241 self.req_create_claimed_receiver(req).await?
1242 }
1243 HandleRequest::CloseChannelEnd(req) => self.req_close_channel_end(req).await?,
1244 HandleRequest::ClaimSender(req) => self.req_claim_sender(req).await?,
1245 HandleRequest::ClaimReceiver(req) => self.req_claim_receiver(req).await?,
1246 HandleRequest::SendItem(req) => self.req_send_item(req).await?,
1247 HandleRequest::AddChannelCapacity(req) => self.req_add_channel_capacity(req).await?,
1248 HandleRequest::SyncClient(req) => self.req_sync_client(req),
1249 HandleRequest::SyncBroker(req) => self.req_sync_broker(req).await?,
1250 HandleRequest::CreateBusListener(req) => self.req_create_bus_listener(req).await?,
1251 HandleRequest::DestroyBusListener(req) => self.req_destroy_bus_listener(req).await?,
1252 HandleRequest::AddBusListenerFilter(req) => {
1253 self.req_add_bus_listener_filter(req).await?
1254 }
1255 HandleRequest::RemoveBusListenerFilter(req) => {
1256 self.req_remove_bus_listener_filter(req).await?
1257 }
1258 HandleRequest::ClearBusListenerFilters(req) => {
1259 self.req_clear_bus_listener_filters(req).await?
1260 }
1261 HandleRequest::StartBusListener(req) => self.req_start_bus_listener(req).await?,
1262 HandleRequest::StopBusListener(req) => self.req_stop_bus_listener(req).await?,
1263 HandleRequest::CreateLifetimeListener(req) => {
1264 self.req_create_lifetime_listener(req).await?
1265 }
1266 HandleRequest::GetProtocolVersion(req) => {
1267 let _ = req.send(self.protocol_version);
1268 }
1269 HandleRequest::CreateProxy(req) => self.req_create_proxy(req).await?,
1270 HandleRequest::DestroyProxy(proxy) => self.req_destroy_proxy(proxy).await?,
1271 HandleRequest::SubscribeEvent(req) => self.req_subscribe_event(req).await?,
1272 HandleRequest::UnsubscribeEvent(req) => self.req_unsubscribe_event(req).await?,
1273 HandleRequest::SubscribeAllEvents(req) => self.req_subscribe_all_events(req).await?,
1274 HandleRequest::UnsubscribeAllEvents(req) => {
1275 self.req_unsubscribe_all_events(req).await?
1276 }
1277 #[cfg(feature = "introspection")]
1278 HandleRequest::RegisterIntrospection(ty) => self.req_register_introspection(ty),
1279 #[cfg(feature = "introspection")]
1280 HandleRequest::SubmitIntrospection => self.req_submit_introspection().await?,
1281 #[cfg(feature = "introspection")]
1282 HandleRequest::QueryIntrospection(req) => self.req_query_introspection(req).await?,
1283
1284 HandleRequest::Shutdown => unreachable!(),
1286 }
1287
1288 Ok(())
1289 }
1290
1291 fn req_handle_cloned(&mut self) {
1292 self.num_handles += 1;
1293 }
1294
1295 fn req_handle_dropped(&mut self) {
1296 self.num_handles -= 1;
1297 debug_assert!(self.num_handles >= 1);
1298 }
1299
1300 async fn req_create_object(
1301 &mut self,
1302 req: CreateObjectRequest,
1303 ) -> Result<(), RunError<T::Error>> {
1304 let uuid = req.uuid;
1305 let serial = self.create_object.insert(req);
1306
1307 self.t
1308 .send_and_flush(CreateObject { serial, uuid })
1309 .await
1310 .map_err(Into::into)
1311 }
1312
1313 async fn req_destroy_object(
1314 &mut self,
1315 req: DestroyObjectRequest,
1316 ) -> Result<(), RunError<T::Error>> {
1317 let serial = self.destroy_object.insert(req.reply);
1318
1319 self.t
1320 .send_and_flush(DestroyObject {
1321 serial,
1322 cookie: req.cookie,
1323 })
1324 .await
1325 .map_err(Into::into)
1326 }
1327
1328 async fn req_create_service(
1329 &mut self,
1330 req: CreateServiceRequest,
1331 ) -> Result<(), RunError<T::Error>> {
1332 let object_cookie = req.object_id.cookie;
1333 let uuid = req.service_uuid;
1334
1335 if self.protocol_version >= ProtocolVersion::V1_17 {
1336 let mut info = req.info.to_core();
1337 if self.protocol_version >= ProtocolVersion::V1_18 {
1338 info = info.set_subscribe_all(true);
1339 }
1340
1341 let serial = self.create_service.insert(req);
1342
1343 let msg = CreateService2::with_serialize_info(serial, object_cookie, uuid, info)
1344 .map_err(RunError::Serialize)?;
1345
1346 self.t.send_and_flush(msg).await.map_err(Into::into)
1347 } else {
1348 let version = req.info.version();
1349 let serial = self.create_service.insert(req);
1350
1351 self.t
1352 .send_and_flush(CreateService {
1353 serial,
1354 object_cookie,
1355 uuid,
1356 version,
1357 })
1358 .await
1359 .map_err(Into::into)
1360 }
1361 }
1362
1363 async fn req_destroy_service(
1364 &mut self,
1365 req: DestroyServiceRequest,
1366 ) -> Result<(), RunError<T::Error>> {
1367 let cookie = req.id.cookie;
1368 let serial = self.destroy_service.insert(req);
1369
1370 self.t
1371 .send_and_flush(DestroyService { serial, cookie })
1372 .await
1373 .map_err(Into::into)
1374 }
1375
1376 async fn req_call_function(
1377 &mut self,
1378 req: CallFunctionRequest,
1379 ) -> Result<(), RunError<T::Error>> {
1380 let serial = self.function_calls.insert(req.reply);
1381
1382 if self.protocol_version >= ProtocolVersion::V1_19 {
1383 self.t
1384 .send_and_flush(CallFunction2 {
1385 serial,
1386 service_cookie: req.service_cookie,
1387 function: req.function,
1388 version: req.version,
1389 value: req.value,
1390 })
1391 .await
1392 .map_err(Into::into)
1393 } else {
1394 self.t
1395 .send_and_flush(CallFunction {
1396 serial,
1397 service_cookie: req.service_cookie,
1398 function: req.function,
1399 value: req.value,
1400 })
1401 .await
1402 .map_err(Into::into)
1403 }
1404 }
1405
1406 async fn req_call_function_reply(
1407 &mut self,
1408 req: CallFunctionReplyRequest,
1409 ) -> Result<(), RunError<T::Error>> {
1410 self.abort_call_handles.remove(&req.serial);
1411
1412 self.t
1413 .send_and_flush(CallFunctionReply {
1414 serial: req.serial,
1415 result: req.result,
1416 })
1417 .await
1418 .map_err(Into::into)
1419 }
1420
1421 async fn req_emit_event(&mut self, req: EmitEventRequest) -> Result<(), RunError<T::Error>> {
1422 if self
1423 .broker_subscriptions
1424 .emit(req.service_cookie, req.event)
1425 {
1426 self.t
1427 .send_and_flush(EmitEvent {
1428 service_cookie: req.service_cookie,
1429 event: req.event,
1430 value: req.value,
1431 })
1432 .await?
1433 }
1434
1435 Ok(())
1436 }
1437
1438 async fn req_create_claimed_sender(
1439 &mut self,
1440 req: CreateClaimedSenderRequest,
1441 ) -> Result<(), RunError<T::Error>> {
1442 let serial = self.create_channel.insert(CreateChannelData::Sender(req));
1443
1444 self.t
1445 .send_and_flush(CreateChannel {
1446 serial,
1447 end: ChannelEndWithCapacity::Sender,
1448 })
1449 .await
1450 .map_err(Into::into)
1451 }
1452
1453 async fn req_create_claimed_receiver(
1454 &mut self,
1455 req: CreateClaimedReceiverRequest,
1456 ) -> Result<(), RunError<T::Error>> {
1457 let capacity = req.capacity.get();
1458 let serial = self.create_channel.insert(CreateChannelData::Receiver(req));
1459
1460 self.t
1461 .send_and_flush(CreateChannel {
1462 serial,
1463 end: ChannelEndWithCapacity::Receiver(capacity),
1464 })
1465 .await
1466 .map_err(Into::into)
1467 }
1468
1469 async fn req_close_channel_end(
1470 &mut self,
1471 req: CloseChannelEndRequest,
1472 ) -> Result<(), RunError<T::Error>> {
1473 let cookie = req.cookie;
1474 let end = req.end;
1475
1476 let serial = self.close_channel_end.insert(req);
1477
1478 self.t
1479 .send_and_flush(CloseChannelEnd {
1480 serial,
1481 cookie,
1482 end,
1483 })
1484 .await
1485 .map_err(Into::into)
1486 }
1487
1488 async fn req_claim_sender(
1489 &mut self,
1490 req: ClaimSenderRequest,
1491 ) -> Result<(), RunError<T::Error>> {
1492 let cookie = req.cookie;
1493
1494 let serial = self
1495 .claim_channel_end
1496 .insert(ClaimChannelEndData::Sender(req));
1497
1498 self.t
1499 .send_and_flush(ClaimChannelEnd {
1500 serial,
1501 cookie,
1502 end: ChannelEndWithCapacity::Sender,
1503 })
1504 .await
1505 .map_err(Into::into)
1506 }
1507
1508 async fn req_claim_receiver(
1509 &mut self,
1510 req: ClaimReceiverRequest,
1511 ) -> Result<(), RunError<T::Error>> {
1512 let cookie = req.cookie;
1513 let capacity = req.capacity.get();
1514
1515 let serial = self
1516 .claim_channel_end
1517 .insert(ClaimChannelEndData::Receiver(req));
1518
1519 self.t
1520 .send_and_flush(ClaimChannelEnd {
1521 serial,
1522 cookie,
1523 end: ChannelEndWithCapacity::Receiver(capacity),
1524 })
1525 .await
1526 .map_err(Into::into)
1527 }
1528
1529 async fn req_send_item(&mut self, req: SendItemRequest) -> Result<(), RunError<T::Error>> {
1530 debug_assert!(self.senders.contains_key(&req.cookie));
1531
1532 self.t
1533 .send_and_flush(SendItem {
1534 cookie: req.cookie,
1535 value: req.value,
1536 })
1537 .await
1538 .map_err(Into::into)
1539 }
1540
1541 async fn req_add_channel_capacity(
1542 &mut self,
1543 req: AddChannelCapacity,
1544 ) -> Result<(), RunError<T::Error>> {
1545 debug_assert!(self.receivers.contains_key(&req.cookie));
1546 self.t.send_and_flush(req).await.map_err(Into::into)
1547 }
1548
1549 fn req_sync_client(&self, req: SyncClientRequest) {
1550 let _ = req.send(Instant::now());
1551 }
1552
1553 async fn req_sync_broker(&mut self, req: SyncBrokerRequest) -> Result<(), RunError<T::Error>> {
1554 let serial = self.sync.insert(req);
1555
1556 self.t
1557 .send_and_flush(Sync { serial })
1558 .await
1559 .map_err(Into::into)
1560 }
1561
1562 async fn req_create_bus_listener(
1563 &mut self,
1564 req: CreateBusListenerRequest,
1565 ) -> Result<(), RunError<T::Error>> {
1566 let serial = self
1567 .create_bus_listener
1568 .insert(CreateBusListenerData::BusListener(req));
1569
1570 self.t
1571 .send_and_flush(CreateBusListener { serial })
1572 .await
1573 .map_err(Into::into)
1574 }
1575
1576 async fn req_destroy_bus_listener(
1577 &mut self,
1578 req: DestroyBusListenerRequest,
1579 ) -> Result<(), RunError<T::Error>> {
1580 let cookie = req.cookie;
1581 let serial = self.destroy_bus_listener.insert(req);
1582
1583 self.t
1584 .send_and_flush(DestroyBusListener { serial, cookie })
1585 .await
1586 .map_err(Into::into)
1587 }
1588
1589 async fn req_add_bus_listener_filter(
1590 &mut self,
1591 req: AddBusListenerFilter,
1592 ) -> Result<(), RunError<T::Error>> {
1593 let Some(bus_listener) = self.bus_listeners.get_mut(&req.cookie) else {
1594 return Ok(());
1595 };
1596
1597 self.t.send_and_flush(req).await?;
1598 bus_listener.add_filter(req.filter);
1599
1600 Ok(())
1601 }
1602
1603 async fn req_remove_bus_listener_filter(
1604 &mut self,
1605 req: RemoveBusListenerFilter,
1606 ) -> Result<(), RunError<T::Error>> {
1607 let Some(bus_listener) = self.bus_listeners.get_mut(&req.cookie) else {
1608 return Ok(());
1609 };
1610
1611 self.t.send_and_flush(req).await?;
1612 bus_listener.remove_filter(req.filter);
1613
1614 Ok(())
1615 }
1616
1617 async fn req_clear_bus_listener_filters(
1618 &mut self,
1619 req: ClearBusListenerFilters,
1620 ) -> Result<(), RunError<T::Error>> {
1621 let Some(bus_listener) = self.bus_listeners.get_mut(&req.cookie) else {
1622 return Ok(());
1623 };
1624
1625 self.t.send_and_flush(req).await?;
1626 bus_listener.clear_filters();
1627
1628 Ok(())
1629 }
1630
1631 async fn req_start_bus_listener(
1632 &mut self,
1633 req: StartBusListenerRequest,
1634 ) -> Result<(), RunError<T::Error>> {
1635 let cookie = req.cookie;
1636 let scope = req.scope;
1637 let serial = self.start_bus_listener.insert(req);
1638
1639 self.t
1640 .send_and_flush(StartBusListener {
1641 serial,
1642 cookie,
1643 scope,
1644 })
1645 .await
1646 .map_err(Into::into)
1647 }
1648
1649 async fn req_stop_bus_listener(
1650 &mut self,
1651 req: StopBusListenerRequest,
1652 ) -> Result<(), RunError<T::Error>> {
1653 let cookie = req.cookie;
1654 let serial = self.stop_bus_listener.insert(req);
1655
1656 self.t
1657 .send_and_flush(StopBusListener { serial, cookie })
1658 .await
1659 .map_err(Into::into)
1660 }
1661
1662 async fn req_create_lifetime_listener(
1663 &mut self,
1664 req: CreateLifetimeListenerRequest,
1665 ) -> Result<(), RunError<T::Error>> {
1666 let serial = self
1667 .create_bus_listener
1668 .insert(CreateBusListenerData::LifetimeListener(req));
1669
1670 self.t
1671 .send_and_flush(CreateBusListener { serial })
1672 .await
1673 .map_err(Into::into)
1674 }
1675
1676 async fn req_create_proxy(
1677 &mut self,
1678 req: CreateProxyRequest,
1679 ) -> Result<(), RunError<T::Error>> {
1680 let msg = if self.protocol_version >= ProtocolVersion::V1_17 {
1681 let cookie = req.service.cookie;
1682 let serial = self.query_service_info.insert(req);
1683 Message::QueryServiceInfo(QueryServiceInfo { serial, cookie })
1684 } else {
1685 let cookie = req.service.cookie;
1686 let serial = self.query_service_version.insert(req);
1687 Message::QueryServiceVersion(QueryServiceVersion { serial, cookie })
1688 };
1689
1690 self.t.send_and_flush(msg).await.map_err(Into::into)
1691 }
1692
1693 async fn req_destroy_proxy(&mut self, proxy: ProxyId) -> Result<(), RunError<T::Error>> {
1694 if let Some(res) = self.proxies.remove(proxy) {
1695 if res.unsubscribe && (self.protocol_version >= ProtocolVersion::V1_18) {
1696 self.t
1697 .send(UnsubscribeService {
1698 service_cookie: res.service,
1699 })
1700 .await?;
1701 }
1702
1703 for event in res.events {
1704 self.t
1705 .send(UnsubscribeEvent {
1706 service_cookie: res.service,
1707 event,
1708 })
1709 .await?;
1710 }
1711
1712 if res.all_events {
1713 debug_assert!(self.protocol_version >= ProtocolVersion::V1_18);
1714 self.t
1715 .send(UnsubscribeAllEvents {
1716 serial: None,
1717 service_cookie: res.service,
1718 })
1719 .await?;
1720 }
1721
1722 self.t.flush().await?;
1723 }
1724
1725 Ok(())
1726 }
1727
1728 async fn req_subscribe_event(
1729 &mut self,
1730 req: SubscribeEventRequest,
1731 ) -> Result<(), RunError<T::Error>> {
1732 match self.proxies.subscribe(req.proxy, req.event) {
1733 SubscribeResult::Forward(service_cookie) => {
1734 let event = req.event;
1735 let serial = self.subscribe_event.insert(req);
1736
1737 self.t
1738 .send_and_flush(SubscribeEvent {
1739 serial: Some(serial),
1740 service_cookie,
1741 event,
1742 })
1743 .await?;
1744 }
1745
1746 SubscribeResult::Noop => {
1747 let _ = req.reply.send(Ok(()));
1748 }
1749
1750 SubscribeResult::InvalidProxy => {
1751 let _ = req.reply.send(Err(Error::InvalidService));
1752 }
1753 }
1754
1755 Ok(())
1756 }
1757
1758 async fn req_unsubscribe_event(
1759 &mut self,
1760 req: UnsubscribeEventRequest,
1761 ) -> Result<(), RunError<T::Error>> {
1762 match self.proxies.unsubscribe(req.proxy, req.event) {
1763 SubscribeResult::Forward(service_cookie) => {
1764 self.t
1765 .send_and_flush(UnsubscribeEvent {
1766 service_cookie,
1767 event: req.event,
1768 })
1769 .await?;
1770
1771 let _ = req.reply.send(Ok(()));
1772 }
1773
1774 SubscribeResult::Noop => {
1775 let _ = req.reply.send(Ok(()));
1776 }
1777
1778 SubscribeResult::InvalidProxy => {
1779 let _ = req.reply.send(Err(Error::InvalidService));
1780 }
1781 }
1782
1783 Ok(())
1784 }
1785
1786 async fn req_subscribe_all_events(
1787 &mut self,
1788 req: SubscribeAllEventsRequest,
1789 ) -> Result<(), RunError<T::Error>> {
1790 if self.protocol_version >= ProtocolVersion::V1_18 {
1791 match self.proxies.subscribe_all(req.proxy) {
1792 SubscribeResult::Forward(service_cookie) => {
1793 let serial = self.subscribe_all_events.insert(req);
1794
1795 self.t
1796 .send_and_flush(SubscribeAllEvents {
1797 serial: Some(serial),
1798 service_cookie,
1799 })
1800 .await?;
1801 }
1802
1803 SubscribeResult::Noop => {
1804 let _ = req.reply.send(Ok(()));
1805 }
1806
1807 SubscribeResult::InvalidProxy => {
1808 let _ = req.reply.send(Err(Error::InvalidService));
1809 }
1810 }
1811 } else {
1812 let _ = req.reply.send(Err(Error::NotSupported));
1813 }
1814
1815 Ok(())
1816 }
1817
1818 async fn req_unsubscribe_all_events(
1819 &mut self,
1820 req: UnsubscribeAllEventsRequest,
1821 ) -> Result<(), RunError<T::Error>> {
1822 let Some(res) = self.proxies.unsubscribe_all(req.proxy) else {
1823 let _ = req.reply.send(Err(Error::InvalidService));
1824 return Ok(());
1825 };
1826
1827 for event in res.events {
1828 self.t
1829 .send(UnsubscribeEvent {
1830 service_cookie: res.service,
1831 event,
1832 })
1833 .await?;
1834 }
1835
1836 if res.all_events {
1837 debug_assert!(self.protocol_version >= ProtocolVersion::V1_18);
1838 let serial = self.unsubscribe_all_events.insert(req);
1839
1840 self.t
1841 .send(UnsubscribeAllEvents {
1842 serial: Some(serial),
1843 service_cookie: res.service,
1844 })
1845 .await?;
1846 } else {
1847 let _ = req.reply.send(Ok(()));
1848 }
1849
1850 self.t.flush().await?;
1851 Ok(())
1852 }
1853
1854 #[cfg(feature = "introspection")]
1855 fn req_register_introspection(&mut self, ty: DynIntrospectable) {
1856 use std::collections::hash_map::Entry;
1857
1858 let mut types = vec![ty];
1859
1860 while let Some(ty) = types.pop() {
1861 let introspection = Introspection::from_dyn(ty);
1862
1863 let Entry::Vacant(entry) = self.introspection.entry(introspection.type_id()) else {
1864 continue;
1865 };
1866
1867 let Ok(introspection) = SerializedValue::serialize(&introspection) else {
1868 continue;
1869 };
1870
1871 ty.add_references(&mut References::new(&mut types));
1872 entry.insert(introspection);
1873 }
1874 }
1875
1876 #[cfg(feature = "introspection")]
1877 async fn req_submit_introspection(&mut self) -> Result<(), RunError<T::Error>> {
1878 use crate::core::message::RegisterIntrospection;
1879
1880 if (self.protocol_version >= ProtocolVersion::V1_17) && !self.introspection.is_empty() {
1881 let type_ids = self.introspection.keys().copied().collect();
1882
1883 let register_introspection = RegisterIntrospection::with_serialize_type_ids(&type_ids)
1884 .map_err(RunError::Serialize)?;
1885
1886 self.t
1887 .send_and_flush(register_introspection)
1888 .await
1889 .map_err(Into::into)
1890 } else {
1891 Ok(())
1892 }
1893 }
1894
1895 #[cfg(feature = "introspection")]
1896 async fn req_query_introspection(
1897 &mut self,
1898 req: QueryIntrospectionRequest,
1899 ) -> Result<(), RunError<T::Error>> {
1900 if let Some(introspection) = self.introspection.get(&req.type_id) {
1901 let _ = req.reply.send(Some(introspection.clone()));
1902 Ok(())
1903 } else if self.protocol_version >= ProtocolVersion::V1_17 {
1904 let type_id = req.type_id;
1905 let serial = self.query_introspection.insert(req);
1906
1907 self.t
1908 .send_and_flush(QueryIntrospection { serial, type_id })
1909 .await
1910 .map_err(Into::into)
1911 } else {
1912 let _ = req.reply.send(None);
1913 Ok(())
1914 }
1915 }
1916
1917 async fn abort_function_call(&mut self, serial: u32) -> Result<(), RunError<T::Error>> {
1918 self.function_calls.abort(serial);
1919
1920 if self.protocol_version >= ProtocolVersion::V1_16 {
1921 self.t.send_and_flush(AbortFunctionCall { serial }).await?;
1922 }
1923
1924 Ok(())
1925 }
1926}
1927
1928#[derive(Debug)]
1929enum CreateChannelData {
1930 Sender(CreateClaimedSenderRequest),
1931 Receiver(CreateClaimedReceiverRequest),
1932}
1933
1934#[derive(Debug)]
1935enum ClaimChannelEndData {
1936 Sender(ClaimSenderRequest),
1937 Receiver(ClaimReceiverRequest),
1938}
1939
1940#[derive(Debug)]
1941enum SenderState {
1942 Pending(oneshot::Sender<Result<(mpsc::UnboundedReceiver<u32>, u32), Error>>),
1943 Established(mpsc::UnboundedSender<u32>),
1944 ReceiverClosed,
1945}
1946
1947#[derive(Debug)]
1948enum ReceiverState {
1949 Pending(oneshot::Sender<Result<mpsc::UnboundedReceiver<SerializedValue>, Error>>),
1950 Established(mpsc::UnboundedSender<SerializedValue>),
1951 SenderClosed,
1952}
1953
1954#[derive(Debug)]
1955enum CreateBusListenerData {
1956 BusListener(CreateBusListenerRequest),
1957 LifetimeListener(CreateLifetimeListenerRequest),
1958}