Skip to main content

liminal_sdk/remote/
handles.rs

1use alloc::string::{String, ToString};
2use alloc::sync::Arc;
3use alloc::vec::Vec;
4use core::time::Duration;
5
6use serde::Serialize;
7use serde::de::DeserializeOwned;
8use spin::Mutex;
9
10use crate::connection::{
11    ConnectionEvent, ConnectionLifecycle, ConnectionPool, ConnectionState, ReconnectJitter,
12    ResumeRequest, SubscriptionId,
13};
14use crate::embedded::{
15    EmbeddedChannelHandle, EmbeddedConversationHandle, EmptyLifecycleStream, ReadyResult,
16    SdkSubscription,
17};
18use crate::{
19    ChannelHandle, ConversationHandle, ConversationId, DeliveryAck, PressureResponse,
20    SchemaValidate, SdkError,
21};
22
23use super::config::SdkConfig;
24use super::protocol::{
25    RemoteTransport, WireConversationRequest, WirePublishRequest, WireResumeRequest,
26    WireSubscribeRequest, deserialize_payload,
27};
28use super::{RemoteConfig, ServerAddress, connection_error};
29
30#[derive(Debug)]
31struct RemoteChannelState {
32    lifecycle: ConnectionLifecycle,
33    pool: ConnectionPool,
34    next_subscription: u64,
35}
36
37/// Channel handle that communicates through SDK-internal wire protocol transport.
38#[derive(Clone, Debug)]
39pub struct RemoteChannelHandle {
40    server_address: ServerAddress,
41    channel_name: String,
42    state: Arc<Mutex<RemoteChannelState>>,
43    transport: Arc<dyn RemoteTransport>,
44}
45
46impl RemoteChannelHandle {
47    /// Creates a remote channel handle from validated configuration.
48    ///
49    /// # Errors
50    ///
51    /// Returns [`SdkError`] if the connection pool cannot be created.
52    pub fn new(config: &RemoteConfig) -> Result<Self, SdkError> {
53        Ok(Self {
54            server_address: config.server_address.clone(),
55            channel_name: config.channel_name.clone(),
56            state: Arc::new(Mutex::new(RemoteChannelState {
57                lifecycle: ConnectionLifecycle::new(config.reconnect_config),
58                pool: ConnectionPool::new(config.pool_config, config.reconnect_config)?,
59                next_subscription: 0,
60            })),
61            transport: Arc::clone(&config.transport),
62        })
63    }
64
65    /// Returns current lifecycle state from the SDK-003 state machine.
66    #[must_use]
67    pub fn connection_state(&self) -> ConnectionState {
68        self.state.lock().lifecycle.state().clone()
69    }
70
71    /// Drives a reconnect attempt through the SDK-003 lifecycle state machine.
72    ///
73    /// # Errors
74    ///
75    /// Returns [`SdkError`] when the lifecycle rejects the transition.
76    pub fn reconnect<J>(&self, jitter: &mut J) -> Result<Duration, SdkError>
77    where
78        J: ReconnectJitter + ?Sized,
79    {
80        self.state.lock().lifecycle.reconnect(jitter)
81    }
82
83    /// Marks the remote channel connected and builds subscription resume requests.
84    ///
85    /// # Errors
86    ///
87    /// Returns [`SdkError`] if lifecycle transition or recovery state is invalid.
88    pub fn connected(&self) -> Result<Vec<ResumeRequest>, SdkError> {
89        // Compute the resume requests under the lock, then release it before any
90        // transport I/O so concurrent callers do not spin-wait during resume calls.
91        let requests = {
92            let mut state = self.state.lock();
93            let previous = state.lifecycle.state().clone();
94            state.lifecycle.connected()?;
95            let event = ConnectionEvent::new(previous, state.lifecycle.state().clone());
96            state.pool.resume_requests_for_transition(&event)?
97        };
98        for request in &requests {
99            let wire_request = WireResumeRequest::new(*request);
100            self.transport.resume(&self.server_address, &wire_request)?;
101        }
102        Ok(requests)
103    }
104
105    /// Marks a subscription active and assigns it to the configured pool.
106    ///
107    /// # Errors
108    ///
109    /// Returns [`SdkError`] if a subscription id overflows or pool assignment fails.
110    pub fn track_subscription(&self) -> Result<SubscriptionId, SdkError> {
111        let mut state = self.state.lock();
112        let id = SubscriptionId::new(state.next_subscription);
113        state.next_subscription =
114            state
115                .next_subscription
116                .checked_add(1)
117                .ok_or_else(|| SdkError::Store {
118                    description: "subscription id overflow".to_string(),
119                })?;
120        state.pool.assign_subscription(id)?;
121        Ok(id)
122    }
123
124    /// Records an acknowledged sequence for subscription recovery.
125    ///
126    /// # Errors
127    ///
128    /// Returns [`SdkError`] if the subscription is not active in the pool.
129    pub fn acknowledge(
130        &self,
131        subscription_id: SubscriptionId,
132        sequence: u64,
133    ) -> Result<(), SdkError> {
134        let mut state = self.state.lock();
135        state.pool.acknowledge(subscription_id, sequence)
136    }
137
138    /// Returns remote server address used by this handle.
139    #[must_use]
140    pub const fn server_address(&self) -> &ServerAddress {
141        &self.server_address
142    }
143}
144
145impl ChannelHandle for RemoteChannelHandle {
146    type Subscription<M>
147        = SdkSubscription<M>
148    where
149        M: DeserializeOwned;
150
151    type ReplyFuture<'a, Resp>
152        = ReadyResult<Resp>
153    where
154        Self: 'a,
155        Resp: DeserializeOwned + 'a;
156
157    fn publish<M>(&self, message: M) -> Result<PressureResponse, SdkError>
158    where
159        M: Serialize + SchemaValidate,
160    {
161        let request = WirePublishRequest::new(&self.channel_name, &message)?;
162        self.transport.publish(&self.server_address, &request)
163    }
164
165    fn subscribe<M>(&self) -> Self::Subscription<M>
166    where
167        M: DeserializeOwned,
168    {
169        let outcome = self.track_subscription().and_then(|subscription_id| {
170            let connection_id = self
171                .state
172                .lock()
173                .pool
174                .connection_for_subscription(subscription_id)
175                .ok_or_else(|| connection_error("subscription was not assigned to the pool"))?;
176            let request = WireSubscribeRequest::new(
177                &self.channel_name,
178                subscription_id,
179                connection_id.get(),
180            )?;
181            self.transport.subscribe(&self.server_address, &request)
182        });
183
184        match outcome {
185            Ok(()) => SdkSubscription::empty(),
186            Err(error) => SdkSubscription::error(error),
187        }
188    }
189
190    fn request_reply<Req, Resp>(&self, request: Req) -> ReadyResult<Resp>
191    where
192        Req: Serialize + SchemaValidate,
193        Resp: DeserializeOwned,
194    {
195        ReadyResult::new(self.request_reply_blocking(&request))
196    }
197}
198
199impl RemoteChannelHandle {
200    /// Publishes a message with an idempotency key and returns a genuine delivery
201    /// ack.
202    ///
203    /// The idempotency key drives dedup-on-delivery on the server: a re-publish of
204    /// the same key is delivered to subscribers at most once. The returned
205    /// [`DeliveryAck`] reports whether this publish was genuinely accepted by a
206    /// subscriber ([`DeliveryAck::is_accepted`]), which a caller such as the aion
207    /// outbox uses to treat the send as done only on real acceptance. This is
208    /// distinct from the backpressure-only [`publish`](ChannelHandle::publish).
209    ///
210    /// # Errors
211    ///
212    /// Returns [`SdkError`] when the message cannot be serialized, the round trip
213    /// fails, or the transport cannot produce a genuine delivery ack.
214    pub fn publish_with_idempotency_key<M>(
215        &self,
216        message: &M,
217        idempotency_key: &str,
218    ) -> Result<DeliveryAck, SdkError>
219    where
220        M: Serialize + SchemaValidate,
221    {
222        let request =
223            WirePublishRequest::with_idempotency_key(&self.channel_name, message, idempotency_key)?;
224        self.transport
225            .publish_with_delivery(&self.server_address, &request)
226    }
227
228    /// Performs a correlated request-reply round trip over the transport and
229    /// deserializes the reply.
230    ///
231    /// The round trip rides the conversation request-reply path: the channel name
232    /// is used as the conversation correlation id and subject, so the reply the
233    /// server returns is matched back to this request by `conversation_id` on the
234    /// single synchronous connection. Schema validation still runs on the request
235    /// so a request-reply enforces the same typing contract as `publish`.
236    ///
237    /// # Errors
238    ///
239    /// Returns [`SdkError`] when the request cannot be serialized, the round trip
240    /// fails, or the reply cannot be deserialized into `Resp`.
241    fn request_reply_blocking<Req, Resp>(&self, request: &Req) -> Result<Resp, SdkError>
242    where
243        Req: Serialize + SchemaValidate,
244        Resp: DeserializeOwned,
245    {
246        let conversation_id = ConversationId::new(self.channel_name.clone());
247        let wire_request = WireConversationRequest::new(&conversation_id, request)?;
248        let reply = self
249            .transport
250            .request_reply_conversation(&self.server_address, &wire_request)?;
251        deserialize_payload(&reply)
252    }
253}
254
255/// Conversation handle that communicates through SDK-internal wire protocol transport.
256#[derive(Clone, Debug)]
257pub struct RemoteConversationHandle {
258    server_address: ServerAddress,
259    conversation_id: ConversationId,
260    lifecycle: Arc<Mutex<ConnectionLifecycle>>,
261    transport: Arc<dyn RemoteTransport>,
262    /// Buffered reply bytes from the most recent [`request`](Self::request) round
263    /// trip, drained by the next [`receive`](ConversationHandle::receive). The
264    /// synchronous transport completes the round trip inside `request`, so the
265    /// correlated reply is held here until the caller deserializes it.
266    pending_reply: Arc<Mutex<Option<Vec<u8>>>>,
267}
268
269impl RemoteConversationHandle {
270    /// Creates a remote conversation handle from validated configuration.
271    #[must_use]
272    pub fn new(config: &RemoteConfig) -> Self {
273        Self {
274            server_address: config.server_address.clone(),
275            conversation_id: config.conversation_id.clone(),
276            lifecycle: Arc::new(Mutex::new(ConnectionLifecycle::new(
277                config.reconnect_config,
278            ))),
279            transport: Arc::clone(&config.transport),
280            pending_reply: Arc::new(Mutex::new(None)),
281        }
282    }
283
284    /// Sends a typed request on this conversation and blocks for its correlated
285    /// reply, buffering the reply for the next [`receive`](ConversationHandle::receive).
286    ///
287    /// This is the request leg of the conversation request-reply pattern. It is
288    /// kept distinct from [`send`](ConversationHandle::send), which stays
289    /// fire-and-forget (the server is silent on success): only `request` sets the
290    /// reply-requested flag and waits for the server's correlated answer. The aion
291    /// dispatch model (`send` request, then `receive` reply) maps onto a `request`
292    /// followed by `receive`.
293    ///
294    /// # Errors
295    ///
296    /// Returns [`SdkError`] when the request cannot be serialized or the round
297    /// trip fails.
298    pub fn request<Req>(&self, request: Req) -> Result<(), SdkError>
299    where
300        Req: Serialize,
301    {
302        let wire_request = WireConversationRequest::new(&self.conversation_id, &request)?;
303        let reply = self
304            .transport
305            .request_reply_conversation(&self.server_address, &wire_request)?;
306        *self.pending_reply.lock() = Some(reply);
307        Ok(())
308    }
309
310    /// Returns current lifecycle state from the SDK-003 state machine.
311    #[must_use]
312    pub fn connection_state(&self) -> ConnectionState {
313        self.lifecycle.lock().state().clone()
314    }
315
316    /// Returns remote server address used by this handle.
317    #[must_use]
318    pub const fn server_address(&self) -> &ServerAddress {
319        &self.server_address
320    }
321}
322
323impl ConversationHandle for RemoteConversationHandle {
324    type ReceiveFuture<'a, M>
325        = ReadyResult<M>
326    where
327        Self: 'a,
328        M: DeserializeOwned + 'a;
329
330    type LifecycleStream = EmptyLifecycleStream;
331
332    fn send<M>(&self, message: M) -> Result<(), SdkError>
333    where
334        M: Serialize,
335    {
336        let request = WireConversationRequest::new(&self.conversation_id, &message)?;
337        self.transport
338            .send_conversation(&self.server_address, &request)
339    }
340
341    fn receive<M>(&self) -> ReadyResult<M>
342    where
343        M: DeserializeOwned,
344    {
345        ReadyResult::new(self.receive_blocking())
346    }
347
348    fn lifecycle(&self) -> Self::LifecycleStream {
349        EmptyLifecycleStream
350    }
351}
352
353impl RemoteConversationHandle {
354    /// Drains and deserializes the correlated reply buffered by the most recent
355    /// [`request`](Self::request).
356    ///
357    /// # Errors
358    ///
359    /// Returns [`SdkError::Conversation`] when no reply is pending (no `request`
360    /// has completed since the last `receive`), or [`SdkError::Serialization`]
361    /// when the buffered reply cannot be deserialized into `M`.
362    fn receive_blocking<M>(&self) -> Result<M, SdkError>
363    where
364        M: DeserializeOwned,
365    {
366        let payload = self
367            .pending_reply
368            .lock()
369            .take()
370            .ok_or_else(|| SdkError::Conversation {
371                conversation_id: self.conversation_id.as_str().to_string(),
372                description: "no correlated reply is pending; call request before receive"
373                    .to_string(),
374            })?;
375        deserialize_payload(&payload)
376    }
377}
378
379/// Runtime-selected channel handle that keeps deployment differences behind configuration.
380#[derive(Clone, Debug)]
381pub enum SdkChannelHandle {
382    /// Embedded direct in-process handle.
383    Embedded(EmbeddedChannelHandle),
384    /// Remote protocol-backed handle.
385    Remote(RemoteChannelHandle),
386}
387
388impl SdkChannelHandle {
389    /// Creates a channel handle selected by SDK configuration.
390    ///
391    /// # Errors
392    ///
393    /// Returns [`SdkError`] if the selected handle cannot be initialized.
394    pub fn new(config: &SdkConfig) -> Result<Self, SdkError> {
395        match config {
396            SdkConfig::Embedded(config) => Ok(Self::Embedded(EmbeddedChannelHandle::new(config))),
397            SdkConfig::Remote(config) => Ok(Self::Remote(RemoteChannelHandle::new(config)?)),
398        }
399    }
400}
401
402impl ChannelHandle for SdkChannelHandle {
403    type Subscription<M>
404        = SdkSubscription<M>
405    where
406        M: DeserializeOwned;
407
408    type ReplyFuture<'a, Resp>
409        = ReadyResult<Resp>
410    where
411        Self: 'a,
412        Resp: DeserializeOwned + 'a;
413
414    fn publish<M>(&self, message: M) -> Result<PressureResponse, SdkError>
415    where
416        M: Serialize + SchemaValidate,
417    {
418        match self {
419            Self::Embedded(handle) => handle.publish(message),
420            Self::Remote(handle) => handle.publish(message),
421        }
422    }
423
424    fn subscribe<M>(&self) -> Self::Subscription<M>
425    where
426        M: DeserializeOwned,
427    {
428        match self {
429            Self::Embedded(handle) => handle.subscribe(),
430            Self::Remote(handle) => handle.subscribe(),
431        }
432    }
433
434    fn request_reply<Req, Resp>(&self, request: Req) -> ReadyResult<Resp>
435    where
436        Req: Serialize + SchemaValidate,
437        Resp: DeserializeOwned,
438    {
439        match self {
440            Self::Embedded(handle) => handle.request_reply(request),
441            Self::Remote(handle) => handle.request_reply(request),
442        }
443    }
444}
445
446/// Runtime-selected conversation handle that keeps deployment differences behind configuration.
447#[derive(Clone, Debug)]
448pub enum SdkConversationHandle {
449    /// Embedded direct in-process handle.
450    Embedded(EmbeddedConversationHandle),
451    /// Remote protocol-backed handle.
452    Remote(RemoteConversationHandle),
453}
454
455impl SdkConversationHandle {
456    /// Creates a conversation handle selected by SDK configuration.
457    ///
458    /// # Errors
459    ///
460    /// Returns [`SdkError`] if the selected handle cannot be initialized.
461    pub fn new(config: &SdkConfig) -> Result<Self, SdkError> {
462        match config {
463            SdkConfig::Embedded(config) => {
464                Ok(Self::Embedded(EmbeddedConversationHandle::new(config)))
465            }
466            SdkConfig::Remote(config) => Ok(Self::Remote(RemoteConversationHandle::new(config))),
467        }
468    }
469}
470
471impl ConversationHandle for SdkConversationHandle {
472    type ReceiveFuture<'a, M>
473        = ReadyResult<M>
474    where
475        Self: 'a,
476        M: DeserializeOwned + 'a;
477
478    type LifecycleStream = EmptyLifecycleStream;
479
480    fn send<M>(&self, message: M) -> Result<(), SdkError>
481    where
482        M: Serialize,
483    {
484        match self {
485            Self::Embedded(handle) => handle.send(message),
486            Self::Remote(handle) => handle.send(message),
487        }
488    }
489
490    fn receive<M>(&self) -> ReadyResult<M>
491    where
492        M: DeserializeOwned,
493    {
494        match self {
495            Self::Embedded(handle) => handle.receive(),
496            Self::Remote(handle) => handle.receive(),
497        }
498    }
499
500    fn lifecycle(&self) -> Self::LifecycleStream {
501        EmptyLifecycleStream
502    }
503}