liminal-sdk 0.2.2

Application-facing SDK traits for liminal messaging clients
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
use alloc::string::{String, ToString};
use alloc::sync::Arc;
use alloc::vec::Vec;
use core::time::Duration;

use serde::Serialize;
use serde::de::DeserializeOwned;
use spin::Mutex;

use crate::connection::{
    ConnectionEvent, ConnectionLifecycle, ConnectionPool, ConnectionState, ReconnectJitter,
    ResumeRequest, SubscriptionId,
};
use crate::embedded::{
    EmbeddedChannelHandle, EmbeddedConversationHandle, EmptyLifecycleStream, ReadyResult,
    SdkSubscription,
};
use crate::{
    ChannelHandle, ConversationHandle, ConversationId, DeliveryAck, PressureResponse,
    SchemaValidate, SdkError,
};

use super::config::SdkConfig;
use super::protocol::{
    RemoteTransport, WireConversationRequest, WirePublishRequest, WireResumeRequest,
    WireSubscribeRequest, deserialize_payload,
};
use super::{RemoteConfig, ServerAddress, connection_error};

#[derive(Debug)]
struct RemoteChannelState {
    lifecycle: ConnectionLifecycle,
    pool: ConnectionPool,
    next_subscription: u64,
}

/// Channel handle that communicates through SDK-internal wire protocol transport.
#[derive(Clone, Debug)]
pub struct RemoteChannelHandle {
    server_address: ServerAddress,
    channel_name: String,
    state: Arc<Mutex<RemoteChannelState>>,
    transport: Arc<dyn RemoteTransport>,
}

impl RemoteChannelHandle {
    /// Creates a remote channel handle from validated configuration.
    ///
    /// # Errors
    ///
    /// Returns [`SdkError`] if the connection pool cannot be created.
    pub fn new(config: &RemoteConfig) -> Result<Self, SdkError> {
        Ok(Self {
            server_address: config.server_address.clone(),
            channel_name: config.channel_name.clone(),
            state: Arc::new(Mutex::new(RemoteChannelState {
                lifecycle: ConnectionLifecycle::new(config.reconnect_config),
                pool: ConnectionPool::new(config.pool_config, config.reconnect_config)?,
                next_subscription: 0,
            })),
            transport: Arc::clone(&config.transport),
        })
    }

    /// Returns current lifecycle state from the SDK-003 state machine.
    #[must_use]
    pub fn connection_state(&self) -> ConnectionState {
        self.state.lock().lifecycle.state().clone()
    }

    /// Drives a reconnect attempt through the SDK-003 lifecycle state machine.
    ///
    /// # Errors
    ///
    /// Returns [`SdkError`] when the lifecycle rejects the transition.
    pub fn reconnect<J>(&self, jitter: &mut J) -> Result<Duration, SdkError>
    where
        J: ReconnectJitter + ?Sized,
    {
        self.state.lock().lifecycle.reconnect(jitter)
    }

    /// Marks the remote channel connected and builds subscription resume requests.
    ///
    /// # Errors
    ///
    /// Returns [`SdkError`] if lifecycle transition or recovery state is invalid.
    pub fn connected(&self) -> Result<Vec<ResumeRequest>, SdkError> {
        // Compute the resume requests under the lock, then release it before any
        // transport I/O so concurrent callers do not spin-wait during resume calls.
        let requests = {
            let mut state = self.state.lock();
            let previous = state.lifecycle.state().clone();
            state.lifecycle.connected()?;
            let event = ConnectionEvent::new(previous, state.lifecycle.state().clone());
            state.pool.resume_requests_for_transition(&event)?
        };
        for request in &requests {
            let wire_request = WireResumeRequest::new(*request);
            self.transport.resume(&self.server_address, &wire_request)?;
        }
        Ok(requests)
    }

    /// Marks a subscription active and assigns it to the configured pool.
    ///
    /// # Errors
    ///
    /// Returns [`SdkError`] if a subscription id overflows or pool assignment fails.
    pub fn track_subscription(&self) -> Result<SubscriptionId, SdkError> {
        let mut state = self.state.lock();
        let id = SubscriptionId::new(state.next_subscription);
        state.next_subscription =
            state
                .next_subscription
                .checked_add(1)
                .ok_or_else(|| SdkError::Store {
                    description: "subscription id overflow".to_string(),
                })?;
        state.pool.assign_subscription(id)?;
        Ok(id)
    }

    /// Records an acknowledged sequence for subscription recovery.
    ///
    /// # Errors
    ///
    /// Returns [`SdkError`] if the subscription is not active in the pool.
    pub fn acknowledge(
        &self,
        subscription_id: SubscriptionId,
        sequence: u64,
    ) -> Result<(), SdkError> {
        let mut state = self.state.lock();
        state.pool.acknowledge(subscription_id, sequence)
    }

    /// Returns remote server address used by this handle.
    #[must_use]
    pub const fn server_address(&self) -> &ServerAddress {
        &self.server_address
    }
}

impl ChannelHandle for RemoteChannelHandle {
    type Subscription<M>
        = SdkSubscription<M>
    where
        M: DeserializeOwned;

    type ReplyFuture<'a, Resp>
        = ReadyResult<Resp>
    where
        Self: 'a,
        Resp: DeserializeOwned + 'a;

    fn publish<M>(&self, message: M) -> Result<PressureResponse, SdkError>
    where
        M: Serialize + SchemaValidate,
    {
        let request = WirePublishRequest::new(&self.channel_name, &message)?;
        self.transport.publish(&self.server_address, &request)
    }

    fn subscribe<M>(&self) -> Self::Subscription<M>
    where
        M: DeserializeOwned,
    {
        let outcome = self.track_subscription().and_then(|subscription_id| {
            let connection_id = self
                .state
                .lock()
                .pool
                .connection_for_subscription(subscription_id)
                .ok_or_else(|| connection_error("subscription was not assigned to the pool"))?;
            let request = WireSubscribeRequest::new(
                &self.channel_name,
                subscription_id,
                connection_id.get(),
            )?;
            self.transport.subscribe(&self.server_address, &request)
        });

        match outcome {
            Ok(()) => SdkSubscription::empty(),
            Err(error) => SdkSubscription::error(error),
        }
    }

    fn request_reply<Req, Resp>(&self, request: Req) -> ReadyResult<Resp>
    where
        Req: Serialize + SchemaValidate,
        Resp: DeserializeOwned,
    {
        ReadyResult::new(self.request_reply_blocking(&request))
    }
}

impl RemoteChannelHandle {
    /// Publishes a message with an idempotency key and returns a genuine delivery
    /// ack.
    ///
    /// The idempotency key drives dedup-on-delivery on the server: a re-publish of
    /// the same key is delivered to subscribers at most once. The returned
    /// [`DeliveryAck`] reports whether this publish was genuinely accepted by a
    /// subscriber ([`DeliveryAck::is_accepted`]), which a caller such as the aion
    /// outbox uses to treat the send as done only on real acceptance. This is
    /// distinct from the backpressure-only [`publish`](ChannelHandle::publish).
    ///
    /// # Errors
    ///
    /// Returns [`SdkError`] when the message cannot be serialized, the round trip
    /// fails, or the transport cannot produce a genuine delivery ack.
    pub fn publish_with_idempotency_key<M>(
        &self,
        message: &M,
        idempotency_key: &str,
    ) -> Result<DeliveryAck, SdkError>
    where
        M: Serialize + SchemaValidate,
    {
        let request =
            WirePublishRequest::with_idempotency_key(&self.channel_name, message, idempotency_key)?;
        self.transport
            .publish_with_delivery(&self.server_address, &request)
    }

    /// Performs a correlated request-reply round trip over the transport and
    /// deserializes the reply.
    ///
    /// The round trip rides the conversation request-reply path: the channel name
    /// is used as the conversation correlation id and subject, so the reply the
    /// server returns is matched back to this request by `conversation_id` on the
    /// single synchronous connection. Schema validation still runs on the request
    /// so a request-reply enforces the same typing contract as `publish`.
    ///
    /// # Errors
    ///
    /// Returns [`SdkError`] when the request cannot be serialized, the round trip
    /// fails, or the reply cannot be deserialized into `Resp`.
    fn request_reply_blocking<Req, Resp>(&self, request: &Req) -> Result<Resp, SdkError>
    where
        Req: Serialize + SchemaValidate,
        Resp: DeserializeOwned,
    {
        let conversation_id = ConversationId::new(self.channel_name.clone());
        let wire_request = WireConversationRequest::new(&conversation_id, request)?;
        let reply = self
            .transport
            .request_reply_conversation(&self.server_address, &wire_request)?;
        deserialize_payload(&reply)
    }
}

/// Conversation handle that communicates through SDK-internal wire protocol transport.
#[derive(Clone, Debug)]
pub struct RemoteConversationHandle {
    server_address: ServerAddress,
    conversation_id: ConversationId,
    lifecycle: Arc<Mutex<ConnectionLifecycle>>,
    transport: Arc<dyn RemoteTransport>,
    /// Buffered reply bytes from the most recent [`request`](Self::request) round
    /// trip, drained by the next [`receive`](ConversationHandle::receive). The
    /// synchronous transport completes the round trip inside `request`, so the
    /// correlated reply is held here until the caller deserializes it.
    pending_reply: Arc<Mutex<Option<Vec<u8>>>>,
}

impl RemoteConversationHandle {
    /// Creates a remote conversation handle from validated configuration.
    #[must_use]
    pub fn new(config: &RemoteConfig) -> Self {
        Self {
            server_address: config.server_address.clone(),
            conversation_id: config.conversation_id.clone(),
            lifecycle: Arc::new(Mutex::new(ConnectionLifecycle::new(
                config.reconnect_config,
            ))),
            transport: Arc::clone(&config.transport),
            pending_reply: Arc::new(Mutex::new(None)),
        }
    }

    /// Sends a typed request on this conversation and blocks for its correlated
    /// reply, buffering the reply for the next [`receive`](ConversationHandle::receive).
    ///
    /// This is the request leg of the conversation request-reply pattern. It is
    /// kept distinct from [`send`](ConversationHandle::send), which stays
    /// fire-and-forget (the server is silent on success): only `request` sets the
    /// reply-requested flag and waits for the server's correlated answer. The aion
    /// dispatch model (`send` request, then `receive` reply) maps onto a `request`
    /// followed by `receive`.
    ///
    /// # Errors
    ///
    /// Returns [`SdkError`] when the request cannot be serialized or the round
    /// trip fails.
    pub fn request<Req>(&self, request: Req) -> Result<(), SdkError>
    where
        Req: Serialize,
    {
        let wire_request = WireConversationRequest::new(&self.conversation_id, &request)?;
        let reply = self
            .transport
            .request_reply_conversation(&self.server_address, &wire_request)?;
        *self.pending_reply.lock() = Some(reply);
        Ok(())
    }

    /// Returns current lifecycle state from the SDK-003 state machine.
    #[must_use]
    pub fn connection_state(&self) -> ConnectionState {
        self.lifecycle.lock().state().clone()
    }

    /// Returns remote server address used by this handle.
    #[must_use]
    pub const fn server_address(&self) -> &ServerAddress {
        &self.server_address
    }
}

impl ConversationHandle for RemoteConversationHandle {
    type ReceiveFuture<'a, M>
        = ReadyResult<M>
    where
        Self: 'a,
        M: DeserializeOwned + 'a;

    type LifecycleStream = EmptyLifecycleStream;

    fn send<M>(&self, message: M) -> Result<(), SdkError>
    where
        M: Serialize,
    {
        let request = WireConversationRequest::new(&self.conversation_id, &message)?;
        self.transport
            .send_conversation(&self.server_address, &request)
    }

    fn receive<M>(&self) -> ReadyResult<M>
    where
        M: DeserializeOwned,
    {
        ReadyResult::new(self.receive_blocking())
    }

    fn lifecycle(&self) -> Self::LifecycleStream {
        EmptyLifecycleStream
    }
}

impl RemoteConversationHandle {
    /// Drains and deserializes the correlated reply buffered by the most recent
    /// [`request`](Self::request).
    ///
    /// # Errors
    ///
    /// Returns [`SdkError::Conversation`] when no reply is pending (no `request`
    /// has completed since the last `receive`), or [`SdkError::Serialization`]
    /// when the buffered reply cannot be deserialized into `M`.
    fn receive_blocking<M>(&self) -> Result<M, SdkError>
    where
        M: DeserializeOwned,
    {
        let payload = self
            .pending_reply
            .lock()
            .take()
            .ok_or_else(|| SdkError::Conversation {
                conversation_id: self.conversation_id.as_str().to_string(),
                description: "no correlated reply is pending; call request before receive"
                    .to_string(),
            })?;
        deserialize_payload(&payload)
    }
}

/// Runtime-selected channel handle that keeps deployment differences behind configuration.
#[derive(Clone, Debug)]
pub enum SdkChannelHandle {
    /// Embedded direct in-process handle.
    Embedded(EmbeddedChannelHandle),
    /// Remote protocol-backed handle.
    Remote(RemoteChannelHandle),
}

impl SdkChannelHandle {
    /// Creates a channel handle selected by SDK configuration.
    ///
    /// # Errors
    ///
    /// Returns [`SdkError`] if the selected handle cannot be initialized.
    pub fn new(config: &SdkConfig) -> Result<Self, SdkError> {
        match config {
            SdkConfig::Embedded(config) => Ok(Self::Embedded(EmbeddedChannelHandle::new(config))),
            SdkConfig::Remote(config) => Ok(Self::Remote(RemoteChannelHandle::new(config)?)),
        }
    }
}

impl ChannelHandle for SdkChannelHandle {
    type Subscription<M>
        = SdkSubscription<M>
    where
        M: DeserializeOwned;

    type ReplyFuture<'a, Resp>
        = ReadyResult<Resp>
    where
        Self: 'a,
        Resp: DeserializeOwned + 'a;

    fn publish<M>(&self, message: M) -> Result<PressureResponse, SdkError>
    where
        M: Serialize + SchemaValidate,
    {
        match self {
            Self::Embedded(handle) => handle.publish(message),
            Self::Remote(handle) => handle.publish(message),
        }
    }

    fn subscribe<M>(&self) -> Self::Subscription<M>
    where
        M: DeserializeOwned,
    {
        match self {
            Self::Embedded(handle) => handle.subscribe(),
            Self::Remote(handle) => handle.subscribe(),
        }
    }

    fn request_reply<Req, Resp>(&self, request: Req) -> ReadyResult<Resp>
    where
        Req: Serialize + SchemaValidate,
        Resp: DeserializeOwned,
    {
        match self {
            Self::Embedded(handle) => handle.request_reply(request),
            Self::Remote(handle) => handle.request_reply(request),
        }
    }
}

/// Runtime-selected conversation handle that keeps deployment differences behind configuration.
#[derive(Clone, Debug)]
pub enum SdkConversationHandle {
    /// Embedded direct in-process handle.
    Embedded(EmbeddedConversationHandle),
    /// Remote protocol-backed handle.
    Remote(RemoteConversationHandle),
}

impl SdkConversationHandle {
    /// Creates a conversation handle selected by SDK configuration.
    ///
    /// # Errors
    ///
    /// Returns [`SdkError`] if the selected handle cannot be initialized.
    pub fn new(config: &SdkConfig) -> Result<Self, SdkError> {
        match config {
            SdkConfig::Embedded(config) => {
                Ok(Self::Embedded(EmbeddedConversationHandle::new(config)))
            }
            SdkConfig::Remote(config) => Ok(Self::Remote(RemoteConversationHandle::new(config))),
        }
    }
}

impl ConversationHandle for SdkConversationHandle {
    type ReceiveFuture<'a, M>
        = ReadyResult<M>
    where
        Self: 'a,
        M: DeserializeOwned + 'a;

    type LifecycleStream = EmptyLifecycleStream;

    fn send<M>(&self, message: M) -> Result<(), SdkError>
    where
        M: Serialize,
    {
        match self {
            Self::Embedded(handle) => handle.send(message),
            Self::Remote(handle) => handle.send(message),
        }
    }

    fn receive<M>(&self) -> ReadyResult<M>
    where
        M: DeserializeOwned,
    {
        match self {
            Self::Embedded(handle) => handle.receive(),
            Self::Remote(handle) => handle.receive(),
        }
    }

    fn lifecycle(&self) -> Self::LifecycleStream {
        EmptyLifecycleStream
    }
}