aldrin/
client.rs

1mod broker_subscriptions;
2mod proxies;
3mod select;
4
5use crate::bus_listener::{BusListener, BusListenerHandle};
6#[cfg(feature = "introspection")]
7use crate::core::introspection::{DynIntrospectable, Introspection, References};
8use crate::core::message::{
9    AbortFunctionCall, AddBusListenerFilter, AddChannelCapacity, BusListenerCurrentFinished,
10    CallFunction, CallFunction2, CallFunctionReply, CallFunctionResult, ChannelEndClaimed,
11    ChannelEndClosed, ClaimChannelEnd, ClaimChannelEndReply, ClaimChannelEndResult,
12    ClearBusListenerFilters, CloseChannelEnd, CloseChannelEndReply, CloseChannelEndResult,
13    Connect2, ConnectData, ConnectResult, CreateBusListener, CreateBusListenerReply, CreateChannel,
14    CreateChannelReply, CreateObject, CreateObjectReply, CreateObjectResult, CreateService,
15    CreateService2, CreateServiceReply, CreateServiceResult, DestroyBusListener,
16    DestroyBusListenerReply, DestroyBusListenerResult, DestroyObject, DestroyObjectReply,
17    DestroyObjectResult, DestroyService, DestroyServiceReply, DestroyServiceResult, EmitBusEvent,
18    EmitEvent, ItemReceived, Message, QueryIntrospection, QueryIntrospectionReply,
19    QueryIntrospectionResult, QueryServiceInfo, QueryServiceInfoReply, QueryServiceInfoResult,
20    QueryServiceVersion, QueryServiceVersionReply, QueryServiceVersionResult,
21    RemoveBusListenerFilter, SendItem, ServiceDestroyed, Shutdown, StartBusListener,
22    StartBusListenerReply, StartBusListenerResult, StopBusListener, StopBusListenerReply,
23    StopBusListenerResult, SubscribeAllEvents, SubscribeAllEventsReply, SubscribeAllEventsResult,
24    SubscribeEvent, SubscribeEventReply, SubscribeEventResult, SubscribeService,
25    SubscribeServiceReply, SubscribeServiceResult, Sync, SyncReply, UnsubscribeAllEvents,
26    UnsubscribeAllEventsReply, UnsubscribeAllEventsResult, UnsubscribeEvent, UnsubscribeService,
27};
28use crate::core::transport::{AsyncTransport, AsyncTransportExt};
29#[cfg(feature = "introspection")]
30use crate::core::TypeId;
31use crate::core::{
32    BusListenerCookie, ChannelCookie, ChannelEnd, ChannelEndWithCapacity, Deserialize, ObjectId,
33    ProtocolVersion, Serialize, SerializedValue, SerializedValueSlice, ServiceCookie, ServiceId,
34    ServiceInfo,
35};
36use crate::error::{ConnectError, RunError};
37use crate::function_call_map::FunctionCallMap;
38#[cfg(feature = "introspection")]
39use crate::handle::request::QueryIntrospectionRequest;
40use crate::handle::request::{
41    CallFunctionReplyRequest, CallFunctionRequest, ClaimReceiverRequest, ClaimSenderRequest,
42    CloseChannelEndRequest, CreateBusListenerRequest, CreateClaimedReceiverRequest,
43    CreateClaimedSenderRequest, CreateLifetimeListenerRequest, CreateObjectRequest,
44    CreateProxyRequest, CreateServiceRequest, DestroyBusListenerRequest, DestroyObjectRequest,
45    DestroyServiceRequest, EmitEventRequest, HandleRequest, SendItemRequest,
46    StartBusListenerRequest, StopBusListenerRequest, SubscribeAllEventsRequest,
47    SubscribeEventRequest, SyncBrokerRequest, SyncClientRequest, UnsubscribeAllEventsRequest,
48    UnsubscribeEventRequest,
49};
50use crate::lifetime::LifetimeListener;
51use crate::low_level::{
52    PendingReceiver, PendingSender, ProxyId, RawCall, Service, UnclaimedReceiver, UnclaimedSender,
53};
54use crate::serial_map::SerialMap;
55use crate::{Error, Handle, Object};
56use broker_subscriptions::BrokerSubscriptions;
57use futures_channel::{mpsc, oneshot};
58use proxies::{Proxies, SubscribeResult};
59use select::{Select, Selected};
60use std::collections::HashMap;
61use std::mem;
62use std::time::Instant;
63
64const PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::V1_19;
65
66/// Aldrin client used to connect to a broker.
67///
68/// This is the first entry point to Aldrin. A [`Client`] is used to establish a connection to an
69/// Aldrin broker. Afterwards, it should be turned into a [`Future`](std::future::Future) with the
70/// [`run`](Client::run) method, which must then be continuously polled and run to completion.
71///
72/// All interaction with a [`Client`] happens asynchronously through one or more
73/// [`Handle`s](Handle), which must be acquired with [`Client::handle`] before calling
74/// [`Client::run`].
75///
76/// # Shutdown
77///
78/// A [`Client`] will automatically shut down when the last [`Handle`] has been dropped. Keep in
79/// mind that several other types (such as e.g. [`Object`]) keep an internal [`Handle`]. Use
80/// [`Handle::shutdown`] to shut down the [`Client`] manually.
81#[derive(Debug)]
82#[must_use = "clients do nothing unless you `.await` or poll `Client::run()`"]
83pub struct Client<T>
84where
85    T: AsyncTransport + Unpin,
86{
87    select: Select,
88    t: T,
89    protocol_version: ProtocolVersion,
90    recv: mpsc::UnboundedReceiver<HandleRequest>,
91    handle: Handle,
92    num_handles: usize,
93    create_object: SerialMap<CreateObjectRequest>,
94    destroy_object: SerialMap<oneshot::Sender<DestroyObjectResult>>,
95    create_service: SerialMap<CreateServiceRequest>,
96    destroy_service: SerialMap<DestroyServiceRequest>,
97    function_calls: FunctionCallMap,
98    services: HashMap<ServiceCookie, mpsc::UnboundedSender<RawCall>>,
99    broker_subscriptions: BrokerSubscriptions,
100    create_channel: SerialMap<CreateChannelData>,
101    close_channel_end: SerialMap<CloseChannelEndRequest>,
102    claim_channel_end: SerialMap<ClaimChannelEndData>,
103    senders: HashMap<ChannelCookie, SenderState>,
104    receivers: HashMap<ChannelCookie, ReceiverState>,
105    sync: SerialMap<SyncBrokerRequest>,
106    create_bus_listener: SerialMap<CreateBusListenerData>,
107    destroy_bus_listener: SerialMap<DestroyBusListenerRequest>,
108    start_bus_listener: SerialMap<StartBusListenerRequest>,
109    stop_bus_listener: SerialMap<StopBusListenerRequest>,
110    bus_listeners: HashMap<BusListenerCookie, BusListenerHandle>,
111    abort_call_handles: HashMap<u32, oneshot::Sender<()>>,
112    query_service_info: SerialMap<CreateProxyRequest>,
113    query_service_version: SerialMap<CreateProxyRequest>,
114    subscribe_event: SerialMap<SubscribeEventRequest>,
115    subscribe_service: SerialMap<ServiceCookie>,
116    subscribe_all_events: SerialMap<SubscribeAllEventsRequest>,
117    unsubscribe_all_events: SerialMap<UnsubscribeAllEventsRequest>,
118    proxies: Proxies,
119    #[cfg(feature = "introspection")]
120    introspection: HashMap<TypeId, SerializedValue>,
121    #[cfg(feature = "introspection")]
122    query_introspection: SerialMap<QueryIntrospectionRequest>,
123}
124
125impl<T> Client<T>
126where
127    T: AsyncTransport + Unpin,
128{
129    /// Creates a client and connects to an Aldrin broker.
130    ///
131    /// If you need to send custom data to the broker, then use
132    /// [`connect_with_data`](Self::connect_with_data) instead. This function sends `()` and
133    /// discards the broker's data.
134    ///
135    /// After creating a client, it must be continuously polled and run to completion with the
136    /// [`run`](Client::run) method.
137    ///
138    /// # Examples
139    ///
140    /// ```
141    /// use aldrin::Client;
142    ///
143    /// # #[tokio::main]
144    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
145    /// # let broker = aldrin_test::tokio::TestBroker::new();
146    /// # let mut handle = broker.clone();
147    /// # let (async_transport, t2) = aldrin::core::channel::unbounded();
148    /// # let conn = tokio::spawn(async move { handle.connect(t2).await });
149    /// // Create an AsyncTransport for connecting to the broker.
150    /// // let async_transport = ...
151    ///
152    /// // Connect to the broker:
153    /// let client = Client::connect(async_transport).await?;
154    /// # tokio::spawn(conn.await??.run());
155    ///
156    /// // Acquire a handle and spawn the client:
157    /// let handle = client.handle().clone();
158    /// let join = tokio::spawn(client.run());
159    ///
160    /// // The client is now fully connected and can be interacted with through the handle.
161    ///
162    /// // Shut down client:
163    /// handle.shutdown();
164    /// join.await??;
165    /// # Ok(())
166    /// # }
167    /// ```
168    pub async fn connect(t: T) -> Result<Self, ConnectError<T::Error>> {
169        let (client, _) = Self::connect_with_data::<()>(t, None).await?;
170        Ok(client)
171    }
172
173    /// Creates a client and connects to an Aldrin broker. Allows to send and receive custom data.
174    ///
175    /// After creating a client, it must be continuously polled and run to completion with the
176    /// [`run`](Client::run) method.
177    pub async fn connect_with_data<D: Serialize + ?Sized>(
178        mut t: T,
179        data: Option<&D>,
180    ) -> Result<(Self, Option<SerializedValue>), ConnectError<T::Error>> {
181        let mut connect_data = ConnectData::new();
182
183        if let Some(data) = data {
184            connect_data.serialize_user(data)?;
185        }
186
187        let connect = Connect2::with_serialize_data(
188            PROTOCOL_VERSION.major(),
189            PROTOCOL_VERSION.minor(),
190            &connect_data,
191        )?;
192
193        t.send_and_flush(connect)
194            .await
195            .map_err(ConnectError::Transport)?;
196
197        let connect_reply = match t.receive().await.map_err(ConnectError::Transport)? {
198            Message::ConnectReply2(connect_reply) => connect_reply,
199            msg => return Err(ConnectError::UnexpectedMessageReceived(msg)),
200        };
201
202        let connect_reply_data = connect_reply.deserialize_connect_data()?;
203
204        let minor_version = match connect_reply.result {
205            ConnectResult::Ok(minor_version) => minor_version,
206            ConnectResult::Rejected => return Err(ConnectError::Rejected(connect_reply_data.user)),
207            ConnectResult::IncompatibleVersion => return Err(ConnectError::IncompatibleVersion),
208        };
209
210        let protocol_version = ProtocolVersion::new(PROTOCOL_VERSION.major(), minor_version)
211            .map_err(|_| ConnectError::IncompatibleVersion)?;
212
213        if protocol_version > PROTOCOL_VERSION {
214            return Err(ConnectError::IncompatibleVersion);
215        }
216
217        let (send, recv) = mpsc::unbounded();
218        let client = Self {
219            select: Select::new(),
220            t,
221            protocol_version,
222            recv,
223            handle: Handle::new(send),
224            num_handles: 1,
225            create_object: SerialMap::new(),
226            destroy_object: SerialMap::new(),
227            create_service: SerialMap::new(),
228            destroy_service: SerialMap::new(),
229            function_calls: FunctionCallMap::new(),
230            services: HashMap::new(),
231            broker_subscriptions: BrokerSubscriptions::new(),
232            create_channel: SerialMap::new(),
233            close_channel_end: SerialMap::new(),
234            claim_channel_end: SerialMap::new(),
235            senders: HashMap::new(),
236            receivers: HashMap::new(),
237            sync: SerialMap::new(),
238            create_bus_listener: SerialMap::new(),
239            destroy_bus_listener: SerialMap::new(),
240            start_bus_listener: SerialMap::new(),
241            stop_bus_listener: SerialMap::new(),
242            bus_listeners: HashMap::new(),
243            abort_call_handles: HashMap::new(),
244            query_service_info: SerialMap::new(),
245            query_service_version: SerialMap::new(),
246            subscribe_event: SerialMap::new(),
247            subscribe_service: SerialMap::new(),
248            subscribe_all_events: SerialMap::new(),
249            unsubscribe_all_events: SerialMap::new(),
250            proxies: Proxies::new(),
251            #[cfg(feature = "introspection")]
252            introspection: HashMap::new(),
253            #[cfg(feature = "introspection")]
254            query_introspection: SerialMap::new(),
255        };
256
257        Ok((client, connect_reply_data.user))
258    }
259
260    /// Creates a client and connects to an Aldrin broker. Allows to send and receive custom data.
261    ///
262    /// After creating a client, it must be continuously polled and run to completion with the
263    /// [`run`](Client::run) method.
264    ///
265    /// # Examples
266    ///
267    /// ```
268    /// use aldrin::Client;
269    ///
270    /// # #[tokio::main]
271    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
272    /// # let broker = aldrin_test::tokio::TestBroker::new();
273    /// # let mut handle = broker.clone();
274    /// # let (async_transport, t2) = aldrin::core::channel::unbounded();
275    /// # tokio::spawn(async move { handle.connect(t2).await });
276    /// // Create an AsyncTransport for connecting to the broker.
277    /// // let async_transport = ...
278    ///
279    /// // Connect to the broker, sending some custom data.
280    /// let (client, data) = Client::connect_with_data(async_transport, Some("Hi!")).await?;
281    ///
282    /// println!("Data the broker sent back: {:?}.", data);
283    /// # Ok(())
284    /// # }
285    /// ```
286    pub async fn connect_with_data_and_deserialize<D1, D2>(
287        t: T,
288        data: Option<&D1>,
289    ) -> Result<(Self, Option<D2>), ConnectError<T::Error>>
290    where
291        D1: Serialize + ?Sized,
292        D2: Deserialize,
293    {
294        let (client, data) = Self::connect_with_data(t, data).await?;
295        let data = data
296            .as_deref()
297            .map(SerializedValueSlice::deserialize)
298            .transpose()?;
299
300        Ok((client, data))
301    }
302
303    /// Returns a handle to the client.
304    ///
305    /// After creating the [`Client`], [`Handle`s](Handle) are the primary entry point for
306    /// interacting with it.
307    ///
308    /// When the last [`Handle`] is dropped, the [`Client`] will automatically shut down.
309    pub fn handle(&self) -> &Handle {
310        &self.handle
311    }
312
313    /// Returns the protocol version that was negotiated with the broker.
314    pub fn protocol_version(self) -> ProtocolVersion {
315        self.protocol_version
316    }
317
318    /// Runs the client until it shuts down.
319    ///
320    /// After creating a [`Client`] it is important to run it before calling any method on a
321    /// [`Handle`].
322    ///
323    /// This is a long running method, that will only complete once the [`Client`] has shut down. It
324    /// should ideally be spawned on a dedicated task (not for performance or technical reasons, but
325    /// for ergonomics).
326    ///
327    /// # Shutdown
328    ///
329    /// A running [`Client`] can be shut down manually with [`Handle::shutdown`]. It will also
330    /// automatically shut down when the last [`Handle`] has been dropped. Be aware, that some
331    /// types (such as e.g. [`Service`]) hold an internal [`Handle`] and will thus keep the
332    /// [`Client`] running. [`Client`s](Client) can also be instructed by the broker to shut down.
333    pub async fn run(mut self) -> Result<(), RunError<T::Error>> {
334        loop {
335            match self.select().await {
336                Selected::Transport(Ok(Message::Shutdown(Shutdown))) => {
337                    self.t.send_and_flush(Shutdown).await?;
338                    return Ok(());
339                }
340
341                Selected::Transport(Ok(msg)) => self.handle_message(msg).await?,
342                Selected::Transport(Err(e)) => return Err(e.into()),
343                Selected::Handle(HandleRequest::Shutdown) => break,
344                Selected::Handle(req) => self.handle_request(req).await?,
345                Selected::AbortFunctionCall(serial) => self.abort_function_call(serial).await?,
346            }
347
348            if self.num_handles == 1 {
349                break;
350            }
351        }
352
353        self.t.send_and_flush(Shutdown).await?;
354        self.drain_transport().await?;
355        Ok(())
356    }
357
358    async fn select(&mut self) -> Selected<T> {
359        self.select
360            .select(&mut self.t, &mut self.recv, &mut self.function_calls)
361            .await
362    }
363
364    async fn drain_transport(&mut self) -> Result<(), RunError<T::Error>> {
365        loop {
366            if let Message::Shutdown(Shutdown) = self.t.receive().await? {
367                return Ok(());
368            }
369        }
370    }
371
372    async fn handle_message(&mut self, msg: Message) -> Result<(), RunError<T::Error>> {
373        match msg {
374            Message::CreateObjectReply(msg) => self.msg_create_object_reply(msg)?,
375            Message::DestroyObjectReply(msg) => self.msg_destroy_object_reply(msg),
376            Message::CreateServiceReply(msg) => self.msg_create_service_reply(msg)?,
377            Message::DestroyServiceReply(msg) => self.msg_destroy_service_reply(msg),
378            Message::CallFunction(msg) => self.msg_call_function(msg).await?,
379            Message::CallFunction2(msg) => self.msg_call_function2(msg).await?,
380            Message::CallFunctionReply(msg) => self.msg_call_function_reply(msg),
381            Message::SubscribeEvent(msg) => self.msg_subscribe_event(msg),
382            Message::UnsubscribeEvent(msg) => self.msg_unsubscribe_event(msg),
383            Message::CreateChannelReply(msg) => self.msg_create_channel_reply(msg)?,
384            Message::CloseChannelEndReply(msg) => self.msg_close_channel_end_reply(msg)?,
385            Message::ChannelEndClosed(msg) => self.msg_channel_end_closed(msg)?,
386            Message::ClaimChannelEndReply(msg) => self.msg_claim_channel_end_reply(msg)?,
387            Message::ChannelEndClaimed(msg) => self.msg_channel_end_claimed(msg)?,
388            Message::ItemReceived(msg) => self.msg_item_received(msg)?,
389            Message::AddChannelCapacity(msg) => self.msg_add_channel_capacity(msg)?,
390            Message::SyncReply(msg) => self.msg_sync_reply(msg)?,
391            Message::CreateBusListenerReply(msg) => self.msg_create_bus_listener_reply(msg)?,
392            Message::DestroyBusListenerReply(msg) => self.msg_destroy_bus_listener_reply(msg)?,
393            Message::StartBusListenerReply(msg) => self.msg_start_bus_listener_reply(msg)?,
394            Message::StopBusListenerReply(msg) => self.msg_stop_bus_listener_reply(msg)?,
395            Message::EmitBusEvent(msg) => self.msg_emit_bus_event(msg)?,
396
397            Message::BusListenerCurrentFinished(msg) => {
398                self.msg_bus_listener_current_finished(msg)?
399            }
400
401            Message::AbortFunctionCall(msg) => self.msg_abort_function_call(msg)?,
402            Message::QueryIntrospection(msg) => self.msg_query_introspection(msg).await?,
403            Message::QueryIntrospectionReply(msg) => self.msg_query_introspection_reply(msg)?,
404            Message::QueryServiceInfoReply(msg) => self.msg_query_service_info_reply(msg).await?,
405
406            Message::QueryServiceVersionReply(msg) => {
407                self.msg_query_service_version_reply(msg).await?
408            }
409
410            Message::SubscribeEventReply(msg) => self.msg_subscribe_event_reply(msg)?,
411            Message::EmitEvent(msg) => self.msg_emit_event(msg),
412            Message::ServiceDestroyed(msg) => self.msg_service_destroyed(msg),
413            Message::SubscribeServiceReply(msg) => self.msg_subscribe_service_reply(msg)?,
414            Message::SubscribeAllEvents(msg) => self.msg_subscribe_all_events(msg)?,
415            Message::SubscribeAllEventsReply(msg) => self.msg_subscribe_all_events_reply(msg)?,
416            Message::UnsubscribeAllEvents(msg) => self.msg_unsubscribe_all_events(msg)?,
417
418            Message::UnsubscribeAllEventsReply(msg) => {
419                self.msg_unsubscribe_all_events_reply(msg)?
420            }
421
422            Message::Connect(_)
423            | Message::ConnectReply(_)
424            | Message::CreateObject(_)
425            | Message::DestroyObject(_)
426            | Message::CreateService(_)
427            | Message::DestroyService(_)
428            | Message::QueryServiceVersion(_)
429            | Message::CreateChannel(_)
430            | Message::CloseChannelEnd(_)
431            | Message::ClaimChannelEnd(_)
432            | Message::SendItem(_)
433            | Message::Sync(_)
434            | Message::CreateBusListener(_)
435            | Message::DestroyBusListener(_)
436            | Message::AddBusListenerFilter(_)
437            | Message::RemoveBusListenerFilter(_)
438            | Message::ClearBusListenerFilters(_)
439            | Message::StartBusListener(_)
440            | Message::StopBusListener(_)
441            | Message::Connect2(_)
442            | Message::ConnectReply2(_)
443            | Message::RegisterIntrospection(_)
444            | Message::CreateService2(_)
445            | Message::QueryServiceInfo(_)
446            | Message::SubscribeService(_)
447            | Message::UnsubscribeService(_) => {
448                return Err(RunError::UnexpectedMessageReceived(msg))
449            }
450
451            Message::Shutdown(Shutdown) => unreachable!(), // Handled in run().
452        }
453
454        Ok(())
455    }
456
457    fn msg_create_object_reply(
458        &mut self,
459        msg: CreateObjectReply,
460    ) -> Result<(), RunError<T::Error>> {
461        let Some(req) = self.create_object.remove(msg.serial) else {
462            return Err(RunError::UnexpectedMessageReceived(msg.into()));
463        };
464
465        let reply = match msg.result {
466            CreateObjectResult::Ok(cookie) => Ok(Object::new_impl(
467                ObjectId::new(req.uuid, cookie),
468                self.handle.clone(),
469            )),
470
471            CreateObjectResult::DuplicateObject => Err(Error::DuplicateObject),
472        };
473
474        let _ = req.reply.send(reply);
475        Ok(())
476    }
477
478    fn msg_destroy_object_reply(&mut self, msg: DestroyObjectReply) {
479        if let Some(send) = self.destroy_object.remove(msg.serial) {
480            let _ = send.send(msg.result);
481        }
482    }
483
484    fn msg_create_service_reply(
485        &mut self,
486        msg: CreateServiceReply,
487    ) -> Result<(), RunError<T::Error>> {
488        let Some(req) = self.create_service.remove(msg.serial) else {
489            return Err(RunError::UnexpectedMessageReceived(msg.into()));
490        };
491
492        let reply = match msg.result {
493            CreateServiceResult::Ok(cookie) => {
494                let (send, function_calls) = mpsc::unbounded();
495                let dup = self.services.insert(cookie, send);
496                debug_assert!(dup.is_none());
497
498                Ok(Service::new_impl(
499                    ServiceId::new(req.object_id, req.service_uuid, cookie),
500                    req.info,
501                    self.handle.clone(),
502                    function_calls,
503                ))
504            }
505
506            CreateServiceResult::DuplicateService => Err(Error::DuplicateService),
507            CreateServiceResult::InvalidObject => Err(Error::InvalidObject),
508            CreateServiceResult::ForeignObject => unreachable!(),
509        };
510
511        let _ = req.reply.send(reply);
512        Ok(())
513    }
514
515    fn msg_destroy_service_reply(&mut self, msg: DestroyServiceReply) {
516        let Some(req) = self.destroy_service.remove(msg.serial) else {
517            return;
518        };
519
520        let reply = match msg.result {
521            DestroyServiceResult::Ok => {
522                let contained = self.services.remove(&req.id.cookie);
523                debug_assert!(contained.is_some());
524                self.broker_subscriptions.remove_service(req.id.cookie);
525                Ok(())
526            }
527
528            DestroyServiceResult::InvalidService => Err(Error::InvalidService),
529            DestroyServiceResult::ForeignObject => unreachable!(),
530        };
531
532        let _ = req.reply.send(reply);
533    }
534
535    async fn msg_call_function(&mut self, msg: CallFunction) -> Result<(), RunError<T::Error>> {
536        let msg = CallFunction2 {
537            serial: msg.serial,
538            service_cookie: msg.service_cookie,
539            function: msg.function,
540            version: None,
541            value: msg.value,
542        };
543
544        self.msg_call_function2(msg).await
545    }
546
547    async fn msg_call_function2(&mut self, msg: CallFunction2) -> Result<(), RunError<T::Error>> {
548        let send = self
549            .services
550            .get_mut(&msg.service_cookie)
551            .expect("inconsistent state");
552
553        let (abort_send, abort_recv) = oneshot::channel();
554
555        let req = RawCall {
556            serial: msg.serial,
557            function: msg.function,
558            version: msg.version,
559            timestamp: Instant::now(),
560            args: msg.value,
561            aborted: abort_recv,
562        };
563
564        if send.unbounded_send(req).is_ok() {
565            let dup = self.abort_call_handles.insert(msg.serial, abort_send);
566            assert!(dup.is_none());
567        } else {
568            self.t
569                .send_and_flush(CallFunctionReply {
570                    serial: msg.serial,
571                    result: CallFunctionResult::InvalidService,
572                })
573                .await?;
574        }
575
576        Ok(())
577    }
578
579    fn msg_call_function_reply(&mut self, msg: CallFunctionReply) {
580        if let Some(send) = self.function_calls.remove(msg.serial) {
581            let _ = send.send(Ok((msg.result, Instant::now())));
582        }
583    }
584
585    fn msg_subscribe_event(&mut self, msg: SubscribeEvent) {
586        self.broker_subscriptions
587            .subscribe(msg.service_cookie, msg.event);
588    }
589
590    fn msg_unsubscribe_event(&mut self, msg: UnsubscribeEvent) {
591        self.broker_subscriptions
592            .unsubscribe(msg.service_cookie, msg.event);
593    }
594
595    fn msg_create_channel_reply(
596        &mut self,
597        msg: CreateChannelReply,
598    ) -> Result<(), RunError<T::Error>> {
599        match self.create_channel.remove(msg.serial) {
600            Some(CreateChannelData::Sender(reply)) => {
601                let (send, recv) = oneshot::channel();
602                let sender = PendingSender::new(self.handle.clone(), msg.cookie, recv);
603                let receiver = UnclaimedReceiver::new(self.handle.clone(), msg.cookie);
604                let dup = self.senders.insert(msg.cookie, SenderState::Pending(send));
605                debug_assert!(dup.is_none());
606                let _ = reply.send((sender, receiver));
607                Ok(())
608            }
609
610            Some(CreateChannelData::Receiver(req)) => {
611                let (send, recv) = oneshot::channel();
612                let sender = UnclaimedSender::new(self.handle.clone(), msg.cookie);
613                let receiver =
614                    PendingReceiver::new(self.handle.clone(), msg.cookie, recv, req.capacity);
615                let dup = self
616                    .receivers
617                    .insert(msg.cookie, ReceiverState::Pending(send));
618                debug_assert!(dup.is_none());
619                let _ = req.reply.send((sender, receiver));
620                Ok(())
621            }
622
623            None => Err(RunError::UnexpectedMessageReceived(msg.into())),
624        }
625    }
626
627    fn msg_close_channel_end_reply(
628        &mut self,
629        msg: CloseChannelEndReply,
630    ) -> Result<(), RunError<T::Error>> {
631        let Some(req) = self.close_channel_end.remove(msg.serial) else {
632            return Err(RunError::UnexpectedMessageReceived(msg.into()));
633        };
634
635        if req.claimed {
636            match req.end {
637                ChannelEnd::Sender => {
638                    let contained = self.senders.remove(&req.cookie);
639                    debug_assert!(contained.is_some());
640                }
641
642                ChannelEnd::Receiver => {
643                    let contained = self.receivers.remove(&req.cookie);
644                    debug_assert!(contained.is_some());
645                }
646            }
647        }
648
649        let res = match msg.result {
650            CloseChannelEndResult::Ok => Ok(()),
651
652            CloseChannelEndResult::InvalidChannel | CloseChannelEndResult::ForeignChannel => {
653                Err(Error::InvalidChannel)
654            }
655        };
656
657        let _ = req.reply.send(res);
658        Ok(())
659    }
660
661    fn msg_channel_end_closed(&mut self, msg: ChannelEndClosed) -> Result<(), RunError<T::Error>> {
662        match msg.end {
663            ChannelEnd::Sender => {
664                let receiver = self
665                    .receivers
666                    .get_mut(&msg.cookie)
667                    .map(|receiver| mem::replace(receiver, ReceiverState::SenderClosed));
668
669                match receiver {
670                    Some(ReceiverState::Pending(send)) => {
671                        let _ = send.send(Err(Error::InvalidChannel));
672                        Ok(())
673                    }
674
675                    Some(ReceiverState::Established(_)) => Ok(()),
676
677                    Some(ReceiverState::SenderClosed) | None => {
678                        Err(RunError::UnexpectedMessageReceived(msg.into()))
679                    }
680                }
681            }
682
683            ChannelEnd::Receiver => {
684                let sender = self
685                    .senders
686                    .get_mut(&msg.cookie)
687                    .map(|sender| mem::replace(sender, SenderState::ReceiverClosed));
688
689                match sender {
690                    Some(SenderState::Pending(send)) => {
691                        let _ = send.send(Err(Error::InvalidChannel));
692                        Ok(())
693                    }
694
695                    Some(SenderState::Established(_)) => Ok(()),
696
697                    Some(SenderState::ReceiverClosed) | None => {
698                        Err(RunError::UnexpectedMessageReceived(msg.into()))
699                    }
700                }
701            }
702        }
703    }
704
705    fn msg_claim_channel_end_reply(
706        &mut self,
707        msg: ClaimChannelEndReply,
708    ) -> Result<(), RunError<T::Error>> {
709        let Some(req) = self.claim_channel_end.remove(msg.serial) else {
710            return Err(RunError::UnexpectedMessageReceived(msg.into()));
711        };
712
713        match req {
714            ClaimChannelEndData::Sender(req) => match msg.result {
715                ClaimChannelEndResult::SenderClaimed(capacity) => {
716                    let (send, recv) = mpsc::unbounded();
717                    let dup = self
718                        .senders
719                        .insert(req.cookie, SenderState::Established(send));
720                    debug_assert!(dup.is_none());
721                    let _ = req.reply.send(Ok((recv, capacity)));
722                }
723
724                ClaimChannelEndResult::ReceiverClaimed => {
725                    return Err(RunError::UnexpectedMessageReceived(msg.into()))
726                }
727
728                ClaimChannelEndResult::InvalidChannel | ClaimChannelEndResult::AlreadyClaimed => {
729                    let _ = req.reply.send(Err(Error::InvalidChannel));
730                }
731            },
732
733            ClaimChannelEndData::Receiver(req) => match msg.result {
734                ClaimChannelEndResult::SenderClaimed(_) => {
735                    return Err(RunError::UnexpectedMessageReceived(msg.into()))
736                }
737
738                ClaimChannelEndResult::ReceiverClaimed => {
739                    let (send, recv) = mpsc::unbounded();
740                    let dup = self
741                        .receivers
742                        .insert(req.cookie, ReceiverState::Established(send));
743                    debug_assert!(dup.is_none());
744                    let _ = req.reply.send(Ok((recv, req.capacity)));
745                }
746
747                ClaimChannelEndResult::InvalidChannel | ClaimChannelEndResult::AlreadyClaimed => {
748                    let _ = req.reply.send(Err(Error::InvalidChannel));
749                }
750            },
751        }
752
753        Ok(())
754    }
755
756    fn msg_channel_end_claimed(
757        &mut self,
758        msg: ChannelEndClaimed,
759    ) -> Result<(), RunError<T::Error>> {
760        match msg.end {
761            ChannelEndWithCapacity::Sender => {
762                let Some(receiver) = self.receivers.get_mut(&msg.cookie) else {
763                    return Err(RunError::UnexpectedMessageReceived(msg.into()));
764                };
765
766                let (send, recv) = mpsc::unbounded();
767
768                match mem::replace(receiver, ReceiverState::Established(send)) {
769                    ReceiverState::Pending(send) => {
770                        let _ = send.send(Ok(recv));
771                        Ok(())
772                    }
773
774                    ReceiverState::Established(_) | ReceiverState::SenderClosed => {
775                        Err(RunError::UnexpectedMessageReceived(msg.into()))
776                    }
777                }
778            }
779
780            ChannelEndWithCapacity::Receiver(capacity) => {
781                let Some(sender) = self.senders.get_mut(&msg.cookie) else {
782                    return Err(RunError::UnexpectedMessageReceived(msg.into()));
783                };
784
785                let (send, recv) = mpsc::unbounded();
786
787                match mem::replace(sender, SenderState::Established(send)) {
788                    SenderState::Pending(send) => {
789                        let _ = send.send(Ok((recv, capacity)));
790                        Ok(())
791                    }
792
793                    SenderState::Established(_) | SenderState::ReceiverClosed => {
794                        Err(RunError::UnexpectedMessageReceived(msg.into()))
795                    }
796                }
797            }
798        }
799    }
800
801    fn msg_item_received(&self, msg: ItemReceived) -> Result<(), RunError<T::Error>> {
802        if let Some(ReceiverState::Established(send)) = self.receivers.get(&msg.cookie) {
803            let _ = send.unbounded_send(msg.value);
804            Ok(())
805        } else {
806            Err(RunError::UnexpectedMessageReceived(msg.into()))
807        }
808    }
809
810    fn msg_add_channel_capacity(&self, msg: AddChannelCapacity) -> Result<(), RunError<T::Error>> {
811        if let Some(SenderState::Established(send)) = self.senders.get(&msg.cookie) {
812            let _ = send.unbounded_send(msg.capacity);
813            Ok(())
814        } else {
815            Err(RunError::UnexpectedMessageReceived(msg.into()))
816        }
817    }
818
819    fn msg_sync_reply(&mut self, msg: SyncReply) -> Result<(), RunError<T::Error>> {
820        if let Some(req) = self.sync.remove(msg.serial) {
821            let _ = req.send(Instant::now());
822            Ok(())
823        } else {
824            Err(RunError::UnexpectedMessageReceived(msg.into()))
825        }
826    }
827
828    fn msg_create_bus_listener_reply(
829        &mut self,
830        msg: CreateBusListenerReply,
831    ) -> Result<(), RunError<T::Error>> {
832        let Some(data) = self.create_bus_listener.remove(msg.serial) else {
833            return Err(RunError::UnexpectedMessageReceived(msg.into()));
834        };
835
836        let (send, recv) = mpsc::unbounded();
837
838        match data {
839            CreateBusListenerData::BusListener(reply) => {
840                let listener = BusListener::new_impl(msg.cookie, self.handle.clone(), recv);
841                let _ = reply.send(listener);
842            }
843
844            CreateBusListenerData::LifetimeListener(reply) => {
845                let listener = LifetimeListener::new(msg.cookie, self.handle.clone(), recv);
846                let _ = reply.send(listener);
847            }
848        }
849
850        let bus_listener_handle = BusListenerHandle::new(send);
851        let dup = self.bus_listeners.insert(msg.cookie, bus_listener_handle);
852        assert!(dup.is_none());
853
854        Ok(())
855    }
856
857    fn msg_destroy_bus_listener_reply(
858        &mut self,
859        msg: DestroyBusListenerReply,
860    ) -> Result<(), RunError<T::Error>> {
861        let Some(req) = self.destroy_bus_listener.remove(msg.serial) else {
862            return Err(RunError::UnexpectedMessageReceived(msg.into()));
863        };
864
865        if msg.result == DestroyBusListenerResult::Ok {
866            let contained = self.bus_listeners.remove(&req.cookie);
867            debug_assert!(contained.is_some());
868        }
869
870        let _ = req.reply.send(msg.result);
871
872        Ok(())
873    }
874
875    fn msg_start_bus_listener_reply(
876        &mut self,
877        msg: StartBusListenerReply,
878    ) -> Result<(), RunError<T::Error>> {
879        let Some(req) = self.start_bus_listener.remove(msg.serial) else {
880            return Err(RunError::UnexpectedMessageReceived(msg.into()));
881        };
882
883        if msg.result == StartBusListenerResult::Ok {
884            let Some(bus_listener) = self.bus_listeners.get_mut(&req.cookie) else {
885                return Err(RunError::UnexpectedMessageReceived(msg.into()));
886            };
887
888            if bus_listener.start(req.scope) {
889                let _ = req.reply.send(msg.result);
890                Ok(())
891            } else {
892                Err(RunError::UnexpectedMessageReceived(msg.into()))
893            }
894        } else {
895            let _ = req.reply.send(msg.result);
896            Ok(())
897        }
898    }
899
900    fn msg_stop_bus_listener_reply(
901        &mut self,
902        msg: StopBusListenerReply,
903    ) -> Result<(), RunError<T::Error>> {
904        let Some(req) = self.stop_bus_listener.remove(msg.serial) else {
905            return Err(RunError::UnexpectedMessageReceived(msg.into()));
906        };
907
908        if msg.result == StopBusListenerResult::Ok {
909            let Some(bus_listener) = self.bus_listeners.get_mut(&req.cookie) else {
910                return Err(RunError::UnexpectedMessageReceived(msg.into()));
911            };
912
913            if bus_listener.stop() {
914                let _ = req.reply.send(msg.result);
915                Ok(())
916            } else {
917                Err(RunError::UnexpectedMessageReceived(msg.into()))
918            }
919        } else {
920            let _ = req.reply.send(msg.result);
921            Ok(())
922        }
923    }
924
925    fn msg_emit_bus_event(&self, msg: EmitBusEvent) -> Result<(), RunError<T::Error>> {
926        if let Some(cookie) = msg.cookie {
927            let Some(bus_listener) = self.bus_listeners.get(&cookie) else {
928                return Err(RunError::UnexpectedMessageReceived(msg.into()));
929            };
930
931            if bus_listener.emit_current(msg.event) {
932                Ok(())
933            } else {
934                Err(RunError::UnexpectedMessageReceived(msg.into()))
935            }
936        } else {
937            for bus_listener in self.bus_listeners.values() {
938                bus_listener.emit_new_if_matches(msg.event);
939            }
940
941            Ok(())
942        }
943    }
944
945    fn msg_bus_listener_current_finished(
946        &mut self,
947        msg: BusListenerCurrentFinished,
948    ) -> Result<(), RunError<T::Error>> {
949        if let Some(bus_listener) = self.bus_listeners.get_mut(&msg.cookie) {
950            if bus_listener.current_finished() {
951                Ok(())
952            } else {
953                Err(RunError::UnexpectedMessageReceived(msg.into()))
954            }
955        } else {
956            Err(RunError::UnexpectedMessageReceived(msg.into()))
957        }
958    }
959
960    fn msg_abort_function_call(
961        &mut self,
962        msg: AbortFunctionCall,
963    ) -> Result<(), RunError<T::Error>> {
964        if self.protocol_version >= ProtocolVersion::V1_16 {
965            self.abort_call_handles.remove(&msg.serial);
966            Ok(())
967        } else {
968            Err(RunError::UnexpectedMessageReceived(msg.into()))
969        }
970    }
971
972    #[cfg(feature = "introspection")]
973    async fn msg_query_introspection(
974        &mut self,
975        msg: QueryIntrospection,
976    ) -> Result<(), RunError<T::Error>> {
977        if self.protocol_version >= ProtocolVersion::V1_17 {
978            let result = if let Some(introspection) = self.introspection.get(&msg.type_id) {
979                QueryIntrospectionResult::Ok(introspection.clone())
980            } else {
981                QueryIntrospectionResult::Unavailable
982            };
983
984            self.t
985                .send_and_flush(QueryIntrospectionReply {
986                    serial: msg.serial,
987                    result,
988                })
989                .await
990                .map_err(Into::into)
991        } else {
992            Err(RunError::UnexpectedMessageReceived(msg.into()))
993        }
994    }
995
996    #[cfg(not(feature = "introspection"))]
997    async fn msg_query_introspection(
998        &mut self,
999        msg: QueryIntrospection,
1000    ) -> Result<(), RunError<T::Error>> {
1001        self.t
1002            .send_and_flush(QueryIntrospectionReply {
1003                serial: msg.serial,
1004                result: QueryIntrospectionResult::Unavailable,
1005            })
1006            .await
1007            .map_err(Into::into)
1008    }
1009
1010    #[cfg(feature = "introspection")]
1011    fn msg_query_introspection_reply(
1012        &mut self,
1013        msg: QueryIntrospectionReply,
1014    ) -> Result<(), RunError<T::Error>> {
1015        if self.protocol_version < ProtocolVersion::V1_17 {
1016            return Err(RunError::UnexpectedMessageReceived(msg.into()));
1017        }
1018
1019        let Some(req) = self.query_introspection.remove(msg.serial) else {
1020            return Err(RunError::UnexpectedMessageReceived(msg.into()));
1021        };
1022
1023        match msg.result {
1024            QueryIntrospectionResult::Ok(introspection) => {
1025                let _ = req.reply.send(Some(introspection));
1026            }
1027
1028            QueryIntrospectionResult::Unavailable => {
1029                let _ = req.reply.send(None);
1030            }
1031        }
1032
1033        Ok(())
1034    }
1035
1036    #[cfg(not(feature = "introspection"))]
1037    fn msg_query_introspection_reply(
1038        &mut self,
1039        msg: QueryIntrospectionReply,
1040    ) -> Result<(), RunError<T::Error>> {
1041        Err(RunError::UnexpectedMessageReceived(msg.into()))
1042    }
1043
1044    async fn msg_query_service_info_reply(
1045        &mut self,
1046        msg: QueryServiceInfoReply,
1047    ) -> Result<(), RunError<T::Error>> {
1048        let Some(req) = self.query_service_info.remove(msg.serial) else {
1049            return Err(RunError::UnexpectedMessageReceived(msg.into()));
1050        };
1051
1052        debug_assert!(self.protocol_version >= ProtocolVersion::V1_17);
1053
1054        let info = match msg.result {
1055            QueryServiceInfoResult::Ok(info) => {
1056                info.deserialize().map_err(RunError::Deserialize).map(Ok)?
1057            }
1058
1059            QueryServiceInfoResult::InvalidService => Err(Error::InvalidService),
1060        };
1061
1062        self.finish_create_proxy(req, info).await?;
1063        Ok(())
1064    }
1065
1066    async fn msg_query_service_version_reply(
1067        &mut self,
1068        msg: QueryServiceVersionReply,
1069    ) -> Result<(), RunError<T::Error>> {
1070        let Some(req) = self.query_service_version.remove(msg.serial) else {
1071            return Err(RunError::UnexpectedMessageReceived(msg.into()));
1072        };
1073
1074        // We never send QueryServiceVersion on protocol versions >= 1.17.
1075        debug_assert!(self.protocol_version < ProtocolVersion::V1_17);
1076
1077        let info = match msg.result {
1078            QueryServiceVersionResult::Ok(version) => Ok(ServiceInfo::new(version)),
1079            QueryServiceVersionResult::InvalidService => Err(Error::InvalidService),
1080        };
1081
1082        self.finish_create_proxy(req, info).await?;
1083        Ok(())
1084    }
1085
1086    async fn finish_create_proxy(
1087        &mut self,
1088        req: CreateProxyRequest,
1089        info: Result<ServiceInfo, Error>,
1090    ) -> Result<(), RunError<T::Error>> {
1091        let info = match info {
1092            Ok(info) => info,
1093
1094            Err(e) => {
1095                let _ = req.reply.send(Err(e));
1096                return Ok(());
1097            }
1098        };
1099
1100        let (proxy, subscribe_service) =
1101            self.proxies.create(self.handle.clone(), req.service, info);
1102        let _ = req.reply.send(Ok(proxy));
1103
1104        if subscribe_service && (self.protocol_version >= ProtocolVersion::V1_18) {
1105            let serial = self.subscribe_service.insert(req.service.cookie);
1106
1107            self.t
1108                .send_and_flush(SubscribeService {
1109                    serial,
1110                    service_cookie: req.service.cookie,
1111                })
1112                .await?;
1113        }
1114
1115        Ok(())
1116    }
1117
1118    fn msg_subscribe_event_reply(
1119        &mut self,
1120        msg: SubscribeEventReply,
1121    ) -> Result<(), RunError<T::Error>> {
1122        if let Some(req) = self.subscribe_event.remove(msg.serial) {
1123            let res = match msg.result {
1124                SubscribeEventResult::Ok => Ok(()),
1125                SubscribeEventResult::InvalidService => Err(Error::InvalidService),
1126            };
1127
1128            let _ = req.reply.send(res);
1129            Ok(())
1130        } else {
1131            Err(RunError::UnexpectedMessageReceived(msg.into()))
1132        }
1133    }
1134
1135    fn msg_emit_event(&self, msg: EmitEvent) {
1136        self.proxies
1137            .emit(msg.service_cookie, msg.event, Instant::now(), msg.value);
1138    }
1139
1140    fn msg_service_destroyed(&mut self, msg: ServiceDestroyed) {
1141        self.proxies.remove_service(msg.service_cookie);
1142    }
1143
1144    fn msg_subscribe_service_reply(
1145        &mut self,
1146        msg: SubscribeServiceReply,
1147    ) -> Result<(), RunError<T::Error>> {
1148        if let Some(service) = self.subscribe_service.remove(msg.serial) {
1149            debug_assert!(self.protocol_version >= ProtocolVersion::V1_18);
1150
1151            if msg.result == SubscribeServiceResult::InvalidService {
1152                self.proxies.remove_service(service);
1153            }
1154
1155            Ok(())
1156        } else {
1157            Err(RunError::UnexpectedMessageReceived(msg.into()))
1158        }
1159    }
1160
1161    fn msg_subscribe_all_events(
1162        &mut self,
1163        msg: SubscribeAllEvents,
1164    ) -> Result<(), RunError<T::Error>> {
1165        if (self.protocol_version >= ProtocolVersion::V1_18) && msg.serial.is_none() {
1166            self.broker_subscriptions.subscribe_all(msg.service_cookie);
1167            Ok(())
1168        } else {
1169            Err(RunError::UnexpectedMessageReceived(msg.into()))
1170        }
1171    }
1172
1173    fn msg_subscribe_all_events_reply(
1174        &mut self,
1175        msg: SubscribeAllEventsReply,
1176    ) -> Result<(), RunError<T::Error>> {
1177        let Some(req) = self.subscribe_all_events.remove(msg.serial) else {
1178            return Err(RunError::UnexpectedMessageReceived(msg.into()));
1179        };
1180
1181        let res = match msg.result {
1182            SubscribeAllEventsResult::Ok => Ok(()),
1183            SubscribeAllEventsResult::InvalidService => Err(Error::InvalidService),
1184
1185            SubscribeAllEventsResult::NotSupported => {
1186                return Err(RunError::UnexpectedMessageReceived(msg.into()))
1187            }
1188        };
1189
1190        let _ = req.reply.send(res);
1191        Ok(())
1192    }
1193
1194    fn msg_unsubscribe_all_events(
1195        &mut self,
1196        msg: UnsubscribeAllEvents,
1197    ) -> Result<(), RunError<T::Error>> {
1198        if (self.protocol_version >= ProtocolVersion::V1_18) && msg.serial.is_none() {
1199            self.broker_subscriptions
1200                .unsubscribe_all(msg.service_cookie);
1201            Ok(())
1202        } else {
1203            Err(RunError::UnexpectedMessageReceived(msg.into()))
1204        }
1205    }
1206
1207    fn msg_unsubscribe_all_events_reply(
1208        &mut self,
1209        msg: UnsubscribeAllEventsReply,
1210    ) -> Result<(), RunError<T::Error>> {
1211        let Some(req) = self.unsubscribe_all_events.remove(msg.serial) else {
1212            return Err(RunError::UnexpectedMessageReceived(msg.into()));
1213        };
1214
1215        let res = match msg.result {
1216            UnsubscribeAllEventsResult::Ok => Ok(()),
1217            UnsubscribeAllEventsResult::InvalidService => Err(Error::InvalidService),
1218
1219            UnsubscribeAllEventsResult::NotSupported => {
1220                return Err(RunError::UnexpectedMessageReceived(msg.into()))
1221            }
1222        };
1223
1224        let _ = req.reply.send(res);
1225        Ok(())
1226    }
1227
1228    async fn handle_request(&mut self, req: HandleRequest) -> Result<(), RunError<T::Error>> {
1229        match req {
1230            HandleRequest::HandleCloned => self.req_handle_cloned(),
1231            HandleRequest::HandleDropped => self.req_handle_dropped(),
1232            HandleRequest::CreateObject(req) => self.req_create_object(req).await?,
1233            HandleRequest::DestroyObject(req) => self.req_destroy_object(req).await?,
1234            HandleRequest::CreateService(req) => self.req_create_service(req).await?,
1235            HandleRequest::DestroyService(req) => self.req_destroy_service(req).await?,
1236            HandleRequest::CallFunction(req) => self.req_call_function(req).await?,
1237            HandleRequest::CallFunctionReply(req) => self.req_call_function_reply(req).await?,
1238            HandleRequest::EmitEvent(req) => self.req_emit_event(req).await?,
1239            HandleRequest::CreateClaimedSender(req) => self.req_create_claimed_sender(req).await?,
1240            HandleRequest::CreateClaimedReceiver(req) => {
1241                self.req_create_claimed_receiver(req).await?
1242            }
1243            HandleRequest::CloseChannelEnd(req) => self.req_close_channel_end(req).await?,
1244            HandleRequest::ClaimSender(req) => self.req_claim_sender(req).await?,
1245            HandleRequest::ClaimReceiver(req) => self.req_claim_receiver(req).await?,
1246            HandleRequest::SendItem(req) => self.req_send_item(req).await?,
1247            HandleRequest::AddChannelCapacity(req) => self.req_add_channel_capacity(req).await?,
1248            HandleRequest::SyncClient(req) => self.req_sync_client(req),
1249            HandleRequest::SyncBroker(req) => self.req_sync_broker(req).await?,
1250            HandleRequest::CreateBusListener(req) => self.req_create_bus_listener(req).await?,
1251            HandleRequest::DestroyBusListener(req) => self.req_destroy_bus_listener(req).await?,
1252            HandleRequest::AddBusListenerFilter(req) => {
1253                self.req_add_bus_listener_filter(req).await?
1254            }
1255            HandleRequest::RemoveBusListenerFilter(req) => {
1256                self.req_remove_bus_listener_filter(req).await?
1257            }
1258            HandleRequest::ClearBusListenerFilters(req) => {
1259                self.req_clear_bus_listener_filters(req).await?
1260            }
1261            HandleRequest::StartBusListener(req) => self.req_start_bus_listener(req).await?,
1262            HandleRequest::StopBusListener(req) => self.req_stop_bus_listener(req).await?,
1263            HandleRequest::CreateLifetimeListener(req) => {
1264                self.req_create_lifetime_listener(req).await?
1265            }
1266            HandleRequest::GetProtocolVersion(req) => {
1267                let _ = req.send(self.protocol_version);
1268            }
1269            HandleRequest::CreateProxy(req) => self.req_create_proxy(req).await?,
1270            HandleRequest::DestroyProxy(proxy) => self.req_destroy_proxy(proxy).await?,
1271            HandleRequest::SubscribeEvent(req) => self.req_subscribe_event(req).await?,
1272            HandleRequest::UnsubscribeEvent(req) => self.req_unsubscribe_event(req).await?,
1273            HandleRequest::SubscribeAllEvents(req) => self.req_subscribe_all_events(req).await?,
1274            HandleRequest::UnsubscribeAllEvents(req) => {
1275                self.req_unsubscribe_all_events(req).await?
1276            }
1277            #[cfg(feature = "introspection")]
1278            HandleRequest::RegisterIntrospection(ty) => self.req_register_introspection(ty),
1279            #[cfg(feature = "introspection")]
1280            HandleRequest::SubmitIntrospection => self.req_submit_introspection().await?,
1281            #[cfg(feature = "introspection")]
1282            HandleRequest::QueryIntrospection(req) => self.req_query_introspection(req).await?,
1283
1284            // Handled in Client::run()
1285            HandleRequest::Shutdown => unreachable!(),
1286        }
1287
1288        Ok(())
1289    }
1290
1291    fn req_handle_cloned(&mut self) {
1292        self.num_handles += 1;
1293    }
1294
1295    fn req_handle_dropped(&mut self) {
1296        self.num_handles -= 1;
1297        debug_assert!(self.num_handles >= 1);
1298    }
1299
1300    async fn req_create_object(
1301        &mut self,
1302        req: CreateObjectRequest,
1303    ) -> Result<(), RunError<T::Error>> {
1304        let uuid = req.uuid;
1305        let serial = self.create_object.insert(req);
1306
1307        self.t
1308            .send_and_flush(CreateObject { serial, uuid })
1309            .await
1310            .map_err(Into::into)
1311    }
1312
1313    async fn req_destroy_object(
1314        &mut self,
1315        req: DestroyObjectRequest,
1316    ) -> Result<(), RunError<T::Error>> {
1317        let serial = self.destroy_object.insert(req.reply);
1318
1319        self.t
1320            .send_and_flush(DestroyObject {
1321                serial,
1322                cookie: req.cookie,
1323            })
1324            .await
1325            .map_err(Into::into)
1326    }
1327
1328    async fn req_create_service(
1329        &mut self,
1330        req: CreateServiceRequest,
1331    ) -> Result<(), RunError<T::Error>> {
1332        let object_cookie = req.object_id.cookie;
1333        let uuid = req.service_uuid;
1334
1335        if self.protocol_version >= ProtocolVersion::V1_17 {
1336            let mut info = req.info.to_core();
1337            if self.protocol_version >= ProtocolVersion::V1_18 {
1338                info = info.set_subscribe_all(true);
1339            }
1340
1341            let serial = self.create_service.insert(req);
1342
1343            let msg = CreateService2::with_serialize_info(serial, object_cookie, uuid, info)
1344                .map_err(RunError::Serialize)?;
1345
1346            self.t.send_and_flush(msg).await.map_err(Into::into)
1347        } else {
1348            let version = req.info.version();
1349            let serial = self.create_service.insert(req);
1350
1351            self.t
1352                .send_and_flush(CreateService {
1353                    serial,
1354                    object_cookie,
1355                    uuid,
1356                    version,
1357                })
1358                .await
1359                .map_err(Into::into)
1360        }
1361    }
1362
1363    async fn req_destroy_service(
1364        &mut self,
1365        req: DestroyServiceRequest,
1366    ) -> Result<(), RunError<T::Error>> {
1367        let cookie = req.id.cookie;
1368        let serial = self.destroy_service.insert(req);
1369
1370        self.t
1371            .send_and_flush(DestroyService { serial, cookie })
1372            .await
1373            .map_err(Into::into)
1374    }
1375
1376    async fn req_call_function(
1377        &mut self,
1378        req: CallFunctionRequest,
1379    ) -> Result<(), RunError<T::Error>> {
1380        let serial = self.function_calls.insert(req.reply);
1381
1382        if self.protocol_version >= ProtocolVersion::V1_19 {
1383            self.t
1384                .send_and_flush(CallFunction2 {
1385                    serial,
1386                    service_cookie: req.service_cookie,
1387                    function: req.function,
1388                    version: req.version,
1389                    value: req.value,
1390                })
1391                .await
1392                .map_err(Into::into)
1393        } else {
1394            self.t
1395                .send_and_flush(CallFunction {
1396                    serial,
1397                    service_cookie: req.service_cookie,
1398                    function: req.function,
1399                    value: req.value,
1400                })
1401                .await
1402                .map_err(Into::into)
1403        }
1404    }
1405
1406    async fn req_call_function_reply(
1407        &mut self,
1408        req: CallFunctionReplyRequest,
1409    ) -> Result<(), RunError<T::Error>> {
1410        self.abort_call_handles.remove(&req.serial);
1411
1412        self.t
1413            .send_and_flush(CallFunctionReply {
1414                serial: req.serial,
1415                result: req.result,
1416            })
1417            .await
1418            .map_err(Into::into)
1419    }
1420
1421    async fn req_emit_event(&mut self, req: EmitEventRequest) -> Result<(), RunError<T::Error>> {
1422        if self
1423            .broker_subscriptions
1424            .emit(req.service_cookie, req.event)
1425        {
1426            self.t
1427                .send_and_flush(EmitEvent {
1428                    service_cookie: req.service_cookie,
1429                    event: req.event,
1430                    value: req.value,
1431                })
1432                .await?
1433        }
1434
1435        Ok(())
1436    }
1437
1438    async fn req_create_claimed_sender(
1439        &mut self,
1440        req: CreateClaimedSenderRequest,
1441    ) -> Result<(), RunError<T::Error>> {
1442        let serial = self.create_channel.insert(CreateChannelData::Sender(req));
1443
1444        self.t
1445            .send_and_flush(CreateChannel {
1446                serial,
1447                end: ChannelEndWithCapacity::Sender,
1448            })
1449            .await
1450            .map_err(Into::into)
1451    }
1452
1453    async fn req_create_claimed_receiver(
1454        &mut self,
1455        req: CreateClaimedReceiverRequest,
1456    ) -> Result<(), RunError<T::Error>> {
1457        let capacity = req.capacity.get();
1458        let serial = self.create_channel.insert(CreateChannelData::Receiver(req));
1459
1460        self.t
1461            .send_and_flush(CreateChannel {
1462                serial,
1463                end: ChannelEndWithCapacity::Receiver(capacity),
1464            })
1465            .await
1466            .map_err(Into::into)
1467    }
1468
1469    async fn req_close_channel_end(
1470        &mut self,
1471        req: CloseChannelEndRequest,
1472    ) -> Result<(), RunError<T::Error>> {
1473        let cookie = req.cookie;
1474        let end = req.end;
1475
1476        let serial = self.close_channel_end.insert(req);
1477
1478        self.t
1479            .send_and_flush(CloseChannelEnd {
1480                serial,
1481                cookie,
1482                end,
1483            })
1484            .await
1485            .map_err(Into::into)
1486    }
1487
1488    async fn req_claim_sender(
1489        &mut self,
1490        req: ClaimSenderRequest,
1491    ) -> Result<(), RunError<T::Error>> {
1492        let cookie = req.cookie;
1493
1494        let serial = self
1495            .claim_channel_end
1496            .insert(ClaimChannelEndData::Sender(req));
1497
1498        self.t
1499            .send_and_flush(ClaimChannelEnd {
1500                serial,
1501                cookie,
1502                end: ChannelEndWithCapacity::Sender,
1503            })
1504            .await
1505            .map_err(Into::into)
1506    }
1507
1508    async fn req_claim_receiver(
1509        &mut self,
1510        req: ClaimReceiverRequest,
1511    ) -> Result<(), RunError<T::Error>> {
1512        let cookie = req.cookie;
1513        let capacity = req.capacity.get();
1514
1515        let serial = self
1516            .claim_channel_end
1517            .insert(ClaimChannelEndData::Receiver(req));
1518
1519        self.t
1520            .send_and_flush(ClaimChannelEnd {
1521                serial,
1522                cookie,
1523                end: ChannelEndWithCapacity::Receiver(capacity),
1524            })
1525            .await
1526            .map_err(Into::into)
1527    }
1528
1529    async fn req_send_item(&mut self, req: SendItemRequest) -> Result<(), RunError<T::Error>> {
1530        debug_assert!(self.senders.contains_key(&req.cookie));
1531
1532        self.t
1533            .send_and_flush(SendItem {
1534                cookie: req.cookie,
1535                value: req.value,
1536            })
1537            .await
1538            .map_err(Into::into)
1539    }
1540
1541    async fn req_add_channel_capacity(
1542        &mut self,
1543        req: AddChannelCapacity,
1544    ) -> Result<(), RunError<T::Error>> {
1545        debug_assert!(self.receivers.contains_key(&req.cookie));
1546        self.t.send_and_flush(req).await.map_err(Into::into)
1547    }
1548
1549    fn req_sync_client(&self, req: SyncClientRequest) {
1550        let _ = req.send(Instant::now());
1551    }
1552
1553    async fn req_sync_broker(&mut self, req: SyncBrokerRequest) -> Result<(), RunError<T::Error>> {
1554        let serial = self.sync.insert(req);
1555
1556        self.t
1557            .send_and_flush(Sync { serial })
1558            .await
1559            .map_err(Into::into)
1560    }
1561
1562    async fn req_create_bus_listener(
1563        &mut self,
1564        req: CreateBusListenerRequest,
1565    ) -> Result<(), RunError<T::Error>> {
1566        let serial = self
1567            .create_bus_listener
1568            .insert(CreateBusListenerData::BusListener(req));
1569
1570        self.t
1571            .send_and_flush(CreateBusListener { serial })
1572            .await
1573            .map_err(Into::into)
1574    }
1575
1576    async fn req_destroy_bus_listener(
1577        &mut self,
1578        req: DestroyBusListenerRequest,
1579    ) -> Result<(), RunError<T::Error>> {
1580        let cookie = req.cookie;
1581        let serial = self.destroy_bus_listener.insert(req);
1582
1583        self.t
1584            .send_and_flush(DestroyBusListener { serial, cookie })
1585            .await
1586            .map_err(Into::into)
1587    }
1588
1589    async fn req_add_bus_listener_filter(
1590        &mut self,
1591        req: AddBusListenerFilter,
1592    ) -> Result<(), RunError<T::Error>> {
1593        let Some(bus_listener) = self.bus_listeners.get_mut(&req.cookie) else {
1594            return Ok(());
1595        };
1596
1597        self.t.send_and_flush(req).await?;
1598        bus_listener.add_filter(req.filter);
1599
1600        Ok(())
1601    }
1602
1603    async fn req_remove_bus_listener_filter(
1604        &mut self,
1605        req: RemoveBusListenerFilter,
1606    ) -> Result<(), RunError<T::Error>> {
1607        let Some(bus_listener) = self.bus_listeners.get_mut(&req.cookie) else {
1608            return Ok(());
1609        };
1610
1611        self.t.send_and_flush(req).await?;
1612        bus_listener.remove_filter(req.filter);
1613
1614        Ok(())
1615    }
1616
1617    async fn req_clear_bus_listener_filters(
1618        &mut self,
1619        req: ClearBusListenerFilters,
1620    ) -> Result<(), RunError<T::Error>> {
1621        let Some(bus_listener) = self.bus_listeners.get_mut(&req.cookie) else {
1622            return Ok(());
1623        };
1624
1625        self.t.send_and_flush(req).await?;
1626        bus_listener.clear_filters();
1627
1628        Ok(())
1629    }
1630
1631    async fn req_start_bus_listener(
1632        &mut self,
1633        req: StartBusListenerRequest,
1634    ) -> Result<(), RunError<T::Error>> {
1635        let cookie = req.cookie;
1636        let scope = req.scope;
1637        let serial = self.start_bus_listener.insert(req);
1638
1639        self.t
1640            .send_and_flush(StartBusListener {
1641                serial,
1642                cookie,
1643                scope,
1644            })
1645            .await
1646            .map_err(Into::into)
1647    }
1648
1649    async fn req_stop_bus_listener(
1650        &mut self,
1651        req: StopBusListenerRequest,
1652    ) -> Result<(), RunError<T::Error>> {
1653        let cookie = req.cookie;
1654        let serial = self.stop_bus_listener.insert(req);
1655
1656        self.t
1657            .send_and_flush(StopBusListener { serial, cookie })
1658            .await
1659            .map_err(Into::into)
1660    }
1661
1662    async fn req_create_lifetime_listener(
1663        &mut self,
1664        req: CreateLifetimeListenerRequest,
1665    ) -> Result<(), RunError<T::Error>> {
1666        let serial = self
1667            .create_bus_listener
1668            .insert(CreateBusListenerData::LifetimeListener(req));
1669
1670        self.t
1671            .send_and_flush(CreateBusListener { serial })
1672            .await
1673            .map_err(Into::into)
1674    }
1675
1676    async fn req_create_proxy(
1677        &mut self,
1678        req: CreateProxyRequest,
1679    ) -> Result<(), RunError<T::Error>> {
1680        let msg = if self.protocol_version >= ProtocolVersion::V1_17 {
1681            let cookie = req.service.cookie;
1682            let serial = self.query_service_info.insert(req);
1683            Message::QueryServiceInfo(QueryServiceInfo { serial, cookie })
1684        } else {
1685            let cookie = req.service.cookie;
1686            let serial = self.query_service_version.insert(req);
1687            Message::QueryServiceVersion(QueryServiceVersion { serial, cookie })
1688        };
1689
1690        self.t.send_and_flush(msg).await.map_err(Into::into)
1691    }
1692
1693    async fn req_destroy_proxy(&mut self, proxy: ProxyId) -> Result<(), RunError<T::Error>> {
1694        if let Some(res) = self.proxies.remove(proxy) {
1695            if res.unsubscribe && (self.protocol_version >= ProtocolVersion::V1_18) {
1696                self.t
1697                    .send(UnsubscribeService {
1698                        service_cookie: res.service,
1699                    })
1700                    .await?;
1701            }
1702
1703            for event in res.events {
1704                self.t
1705                    .send(UnsubscribeEvent {
1706                        service_cookie: res.service,
1707                        event,
1708                    })
1709                    .await?;
1710            }
1711
1712            if res.all_events {
1713                debug_assert!(self.protocol_version >= ProtocolVersion::V1_18);
1714                self.t
1715                    .send(UnsubscribeAllEvents {
1716                        serial: None,
1717                        service_cookie: res.service,
1718                    })
1719                    .await?;
1720            }
1721
1722            self.t.flush().await?;
1723        }
1724
1725        Ok(())
1726    }
1727
1728    async fn req_subscribe_event(
1729        &mut self,
1730        req: SubscribeEventRequest,
1731    ) -> Result<(), RunError<T::Error>> {
1732        match self.proxies.subscribe(req.proxy, req.event) {
1733            SubscribeResult::Forward(service_cookie) => {
1734                let event = req.event;
1735                let serial = self.subscribe_event.insert(req);
1736
1737                self.t
1738                    .send_and_flush(SubscribeEvent {
1739                        serial: Some(serial),
1740                        service_cookie,
1741                        event,
1742                    })
1743                    .await?;
1744            }
1745
1746            SubscribeResult::Noop => {
1747                let _ = req.reply.send(Ok(()));
1748            }
1749
1750            SubscribeResult::InvalidProxy => {
1751                let _ = req.reply.send(Err(Error::InvalidService));
1752            }
1753        }
1754
1755        Ok(())
1756    }
1757
1758    async fn req_unsubscribe_event(
1759        &mut self,
1760        req: UnsubscribeEventRequest,
1761    ) -> Result<(), RunError<T::Error>> {
1762        match self.proxies.unsubscribe(req.proxy, req.event) {
1763            SubscribeResult::Forward(service_cookie) => {
1764                self.t
1765                    .send_and_flush(UnsubscribeEvent {
1766                        service_cookie,
1767                        event: req.event,
1768                    })
1769                    .await?;
1770
1771                let _ = req.reply.send(Ok(()));
1772            }
1773
1774            SubscribeResult::Noop => {
1775                let _ = req.reply.send(Ok(()));
1776            }
1777
1778            SubscribeResult::InvalidProxy => {
1779                let _ = req.reply.send(Err(Error::InvalidService));
1780            }
1781        }
1782
1783        Ok(())
1784    }
1785
1786    async fn req_subscribe_all_events(
1787        &mut self,
1788        req: SubscribeAllEventsRequest,
1789    ) -> Result<(), RunError<T::Error>> {
1790        if self.protocol_version >= ProtocolVersion::V1_18 {
1791            match self.proxies.subscribe_all(req.proxy) {
1792                SubscribeResult::Forward(service_cookie) => {
1793                    let serial = self.subscribe_all_events.insert(req);
1794
1795                    self.t
1796                        .send_and_flush(SubscribeAllEvents {
1797                            serial: Some(serial),
1798                            service_cookie,
1799                        })
1800                        .await?;
1801                }
1802
1803                SubscribeResult::Noop => {
1804                    let _ = req.reply.send(Ok(()));
1805                }
1806
1807                SubscribeResult::InvalidProxy => {
1808                    let _ = req.reply.send(Err(Error::InvalidService));
1809                }
1810            }
1811        } else {
1812            let _ = req.reply.send(Err(Error::NotSupported));
1813        }
1814
1815        Ok(())
1816    }
1817
1818    async fn req_unsubscribe_all_events(
1819        &mut self,
1820        req: UnsubscribeAllEventsRequest,
1821    ) -> Result<(), RunError<T::Error>> {
1822        let Some(res) = self.proxies.unsubscribe_all(req.proxy) else {
1823            let _ = req.reply.send(Err(Error::InvalidService));
1824            return Ok(());
1825        };
1826
1827        for event in res.events {
1828            self.t
1829                .send(UnsubscribeEvent {
1830                    service_cookie: res.service,
1831                    event,
1832                })
1833                .await?;
1834        }
1835
1836        if res.all_events {
1837            debug_assert!(self.protocol_version >= ProtocolVersion::V1_18);
1838            let serial = self.unsubscribe_all_events.insert(req);
1839
1840            self.t
1841                .send(UnsubscribeAllEvents {
1842                    serial: Some(serial),
1843                    service_cookie: res.service,
1844                })
1845                .await?;
1846        } else {
1847            let _ = req.reply.send(Ok(()));
1848        }
1849
1850        self.t.flush().await?;
1851        Ok(())
1852    }
1853
1854    #[cfg(feature = "introspection")]
1855    fn req_register_introspection(&mut self, ty: DynIntrospectable) {
1856        use std::collections::hash_map::Entry;
1857
1858        let mut types = vec![ty];
1859
1860        while let Some(ty) = types.pop() {
1861            let introspection = Introspection::from_dyn(ty);
1862
1863            let Entry::Vacant(entry) = self.introspection.entry(introspection.type_id()) else {
1864                continue;
1865            };
1866
1867            let Ok(introspection) = SerializedValue::serialize(&introspection) else {
1868                continue;
1869            };
1870
1871            ty.add_references(&mut References::new(&mut types));
1872            entry.insert(introspection);
1873        }
1874    }
1875
1876    #[cfg(feature = "introspection")]
1877    async fn req_submit_introspection(&mut self) -> Result<(), RunError<T::Error>> {
1878        use crate::core::message::RegisterIntrospection;
1879
1880        if (self.protocol_version >= ProtocolVersion::V1_17) && !self.introspection.is_empty() {
1881            let type_ids = self.introspection.keys().copied().collect();
1882
1883            let register_introspection = RegisterIntrospection::with_serialize_type_ids(&type_ids)
1884                .map_err(RunError::Serialize)?;
1885
1886            self.t
1887                .send_and_flush(register_introspection)
1888                .await
1889                .map_err(Into::into)
1890        } else {
1891            Ok(())
1892        }
1893    }
1894
1895    #[cfg(feature = "introspection")]
1896    async fn req_query_introspection(
1897        &mut self,
1898        req: QueryIntrospectionRequest,
1899    ) -> Result<(), RunError<T::Error>> {
1900        if let Some(introspection) = self.introspection.get(&req.type_id) {
1901            let _ = req.reply.send(Some(introspection.clone()));
1902            Ok(())
1903        } else if self.protocol_version >= ProtocolVersion::V1_17 {
1904            let type_id = req.type_id;
1905            let serial = self.query_introspection.insert(req);
1906
1907            self.t
1908                .send_and_flush(QueryIntrospection { serial, type_id })
1909                .await
1910                .map_err(Into::into)
1911        } else {
1912            let _ = req.reply.send(None);
1913            Ok(())
1914        }
1915    }
1916
1917    async fn abort_function_call(&mut self, serial: u32) -> Result<(), RunError<T::Error>> {
1918        self.function_calls.abort(serial);
1919
1920        if self.protocol_version >= ProtocolVersion::V1_16 {
1921            self.t.send_and_flush(AbortFunctionCall { serial }).await?;
1922        }
1923
1924        Ok(())
1925    }
1926}
1927
1928#[derive(Debug)]
1929enum CreateChannelData {
1930    Sender(CreateClaimedSenderRequest),
1931    Receiver(CreateClaimedReceiverRequest),
1932}
1933
1934#[derive(Debug)]
1935enum ClaimChannelEndData {
1936    Sender(ClaimSenderRequest),
1937    Receiver(ClaimReceiverRequest),
1938}
1939
1940#[derive(Debug)]
1941enum SenderState {
1942    Pending(oneshot::Sender<Result<(mpsc::UnboundedReceiver<u32>, u32), Error>>),
1943    Established(mpsc::UnboundedSender<u32>),
1944    ReceiverClosed,
1945}
1946
1947#[derive(Debug)]
1948enum ReceiverState {
1949    Pending(oneshot::Sender<Result<mpsc::UnboundedReceiver<SerializedValue>, Error>>),
1950    Established(mpsc::UnboundedSender<SerializedValue>),
1951    SenderClosed,
1952}
1953
1954#[derive(Debug)]
1955enum CreateBusListenerData {
1956    BusListener(CreateBusListenerRequest),
1957    LifetimeListener(CreateLifetimeListenerRequest),
1958}