Skip to main content

liminal_sdk/
embedded.rs

1use alloc::string::{String, ToString};
2use alloc::sync::Arc;
3use core::fmt;
4use core::marker::PhantomData;
5use core::pin::Pin;
6use core::task::{Context, Poll};
7
8use futures_core::Stream;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11
12use crate::{
13    ChannelHandle, ConversationEvent, ConversationHandle, ConversationId, PressureResponse,
14    SchemaMetadata, SchemaValidate, SdkError,
15};
16
17#[cfg(test)]
18mod tests;
19
20/// Type-erased view of a typed channel message passed to an embedded backend.
21///
22/// The embedded SDK path hands this reference directly to the in-process backend;
23/// it never asks serde to produce bytes, protocol envelopes, or wire frames.
24pub trait EmbeddedChannelMessage {
25    /// Returns the compile-time schema metadata declared by the message type.
26    #[must_use]
27    fn schema_metadata(&self) -> SchemaMetadata;
28
29    /// Returns the Rust type name used for diagnostics and backend routing.
30    #[must_use]
31    fn type_name(&self) -> &'static str;
32}
33
34impl<M> EmbeddedChannelMessage for M
35where
36    M: Serialize + SchemaValidate,
37{
38    fn schema_metadata(&self) -> SchemaMetadata {
39        M::schema_metadata()
40    }
41
42    fn type_name(&self) -> &'static str {
43        core::any::type_name::<M>()
44    }
45}
46
47/// Type-erased view of a typed conversation message passed to an embedded backend.
48pub trait EmbeddedConversationMessage {
49    /// Returns the Rust type name used for diagnostics and backend routing.
50    #[must_use]
51    fn type_name(&self) -> &'static str;
52}
53
54impl<M> EmbeddedConversationMessage for M
55where
56    M: Serialize,
57{
58    fn type_name(&self) -> &'static str {
59        core::any::type_name::<M>()
60    }
61}
62
63/// Stream used by SDK handles when no typed messages are buffered locally.
64pub struct SdkSubscription<M> {
65    pending_error: Option<SdkError>,
66    message: PhantomData<M>,
67}
68
69impl<M> SdkSubscription<M> {
70    /// Creates an empty typed subscription stream.
71    #[must_use]
72    pub const fn empty() -> Self {
73        Self {
74            pending_error: None,
75            message: PhantomData,
76        }
77    }
78
79    /// Creates a subscription stream that surfaces a setup error to the caller.
80    #[must_use]
81    pub const fn error(error: SdkError) -> Self {
82        Self {
83            pending_error: Some(error),
84            message: PhantomData,
85        }
86    }
87
88    /// Returns true when this local subscription has no pending setup error.
89    #[must_use]
90    pub const fn is_empty(&self) -> bool {
91        self.pending_error.is_none()
92    }
93}
94
95impl<M> Default for SdkSubscription<M> {
96    fn default() -> Self {
97        Self::empty()
98    }
99}
100
101impl<M> fmt::Debug for SdkSubscription<M> {
102    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
103        formatter
104            .debug_struct("SdkSubscription")
105            .field("has_pending_error", &self.pending_error.is_some())
106            .finish()
107    }
108}
109
110impl<M> Unpin for SdkSubscription<M> {}
111
112impl<M> Stream for SdkSubscription<M> {
113    type Item = Result<M, SdkError>;
114
115    fn poll_next(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
116        core::hint::black_box(context.waker());
117        Poll::Ready(self.pending_error.take().map(Err))
118    }
119}
120
121/// Stream used when a conversation has no lifecycle events buffered locally.
122#[derive(Clone, Debug, Default)]
123pub struct EmptyLifecycleStream;
124
125impl Unpin for EmptyLifecycleStream {}
126
127impl Stream for EmptyLifecycleStream {
128    type Item = ConversationEvent;
129
130    fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
131        core::hint::black_box((self, context.waker()));
132        Poll::Ready(None)
133    }
134}
135
136/// Ready future used by SDK handles that complete an operation synchronously.
137pub struct ReadyResult<T> {
138    result: Option<Result<T, SdkError>>,
139}
140
141impl<T> fmt::Debug for ReadyResult<T> {
142    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
143        formatter
144            .debug_struct("ReadyResult")
145            .field("is_ready", &self.result.is_some())
146            .finish()
147    }
148}
149
150impl<T> ReadyResult<T> {
151    /// Creates a ready future from a result.
152    #[must_use]
153    pub const fn new(result: Result<T, SdkError>) -> Self {
154        Self {
155            result: Some(result),
156        }
157    }
158}
159
160impl<T> Unpin for ReadyResult<T> {}
161
162impl<T> core::future::Future for ReadyResult<T> {
163    type Output = Result<T, SdkError>;
164
165    fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
166        core::hint::black_box(context.waker());
167        let Some(result) = self.result.take() else {
168            return Poll::Ready(Err(SdkError::Protocol {
169                description: "ready future polled after completion".to_string(),
170            }));
171        };
172
173        Poll::Ready(result)
174    }
175}
176
177/// Direct in-process channel backend used by embedded handles.
178pub trait EmbeddedChannelBackend: fmt::Debug + Send + Sync {
179    /// Publishes a typed message reference without protocol framing or wire encoding.
180    ///
181    /// # Errors
182    ///
183    /// Returns [`SdkError`] if the in-process backend rejects the publish attempt.
184    fn publish(&self, message: &dyn EmbeddedChannelMessage) -> Result<PressureResponse, SdkError>;
185}
186
187/// Direct in-process conversation backend used by embedded handles.
188pub trait EmbeddedConversationBackend: fmt::Debug + Send + Sync {
189    /// Sends a typed message reference without protocol framing or wire encoding.
190    ///
191    /// # Errors
192    ///
193    /// Returns [`SdkError`] if the in-process backend rejects the send attempt.
194    fn send(&self, message: &dyn EmbeddedConversationMessage) -> Result<(), SdkError>;
195}
196
197/// Minimal in-process channel backend that accepts messages immediately.
198#[derive(Clone, Debug, Default)]
199pub struct DirectEmbeddedChannelBackend;
200
201impl EmbeddedChannelBackend for DirectEmbeddedChannelBackend {
202    fn publish(&self, message: &dyn EmbeddedChannelMessage) -> Result<PressureResponse, SdkError> {
203        let schema = message.schema_metadata();
204        core::hint::black_box(&schema);
205        Ok(PressureResponse::Accept)
206    }
207}
208
209/// Minimal in-process conversation backend that accepts sends immediately.
210#[derive(Clone, Debug, Default)]
211pub struct DirectEmbeddedConversationBackend;
212
213impl EmbeddedConversationBackend for DirectEmbeddedConversationBackend {
214    fn send(&self, message: &dyn EmbeddedConversationMessage) -> Result<(), SdkError> {
215        let type_name = message.type_name();
216        core::hint::black_box(type_name);
217        Ok(())
218    }
219}
220
221/// Configuration for embedded SDK handles.
222#[derive(Clone, Debug)]
223pub struct EmbeddedConfig {
224    /// Application-visible channel name.
225    pub channel_name: String,
226    /// Application-visible conversation identifier.
227    pub conversation_id: ConversationId,
228    /// Direct channel backend used for in-process publication.
229    pub channel_backend: Arc<dyn EmbeddedChannelBackend>,
230    /// Direct conversation backend used for in-process conversation sends.
231    pub conversation_backend: Arc<dyn EmbeddedConversationBackend>,
232}
233
234impl EmbeddedConfig {
235    /// Creates embedded configuration without requiring a server address.
236    #[must_use]
237    pub fn new(
238        channel_name: impl Into<String>,
239        conversation_id: impl Into<ConversationId>,
240    ) -> Self {
241        Self {
242            channel_name: channel_name.into(),
243            conversation_id: conversation_id.into(),
244            channel_backend: Arc::new(DirectEmbeddedChannelBackend),
245            conversation_backend: Arc::new(DirectEmbeddedConversationBackend),
246        }
247    }
248
249    /// Replaces the direct in-process channel backend.
250    #[must_use]
251    pub fn with_channel_backend(mut self, backend: Arc<dyn EmbeddedChannelBackend>) -> Self {
252        self.channel_backend = backend;
253        self
254    }
255
256    /// Replaces the direct in-process conversation backend.
257    #[must_use]
258    pub fn with_conversation_backend(
259        mut self,
260        backend: Arc<dyn EmbeddedConversationBackend>,
261    ) -> Self {
262        self.conversation_backend = backend;
263        self
264    }
265}
266
267/// Channel handle that publishes through direct in-process references.
268#[derive(Clone, Debug)]
269pub struct EmbeddedChannelHandle {
270    channel_name: String,
271    backend: Arc<dyn EmbeddedChannelBackend>,
272}
273
274impl EmbeddedChannelHandle {
275    /// Creates an embedded channel handle from direct in-process configuration.
276    #[must_use]
277    pub fn new(config: &EmbeddedConfig) -> Self {
278        Self {
279            channel_name: config.channel_name.clone(),
280            backend: Arc::clone(&config.channel_backend),
281        }
282    }
283
284    /// Returns the application-visible channel name.
285    #[must_use]
286    pub fn channel_name(&self) -> &str {
287        self.channel_name.as_str()
288    }
289}
290
291impl ChannelHandle for EmbeddedChannelHandle {
292    type Subscription<M>
293        = SdkSubscription<M>
294    where
295        M: DeserializeOwned;
296
297    type ReplyFuture<'a, Resp>
298        = ReadyResult<Resp>
299    where
300        Self: 'a,
301        Resp: DeserializeOwned + 'a;
302
303    fn publish<M>(&self, message: M) -> Result<PressureResponse, SdkError>
304    where
305        M: Serialize + SchemaValidate,
306    {
307        self.backend.publish(&message)
308    }
309
310    fn subscribe<M>(&self) -> Self::Subscription<M>
311    where
312        M: DeserializeOwned,
313    {
314        SdkSubscription::empty()
315    }
316
317    fn request_reply<Req, Resp>(&self, request: Req) -> ReadyResult<Resp>
318    where
319        Req: Serialize + SchemaValidate,
320        Resp: DeserializeOwned,
321    {
322        let schema = Req::schema_metadata();
323        core::hint::black_box((&request, &schema));
324        ReadyResult::new(Err(SdkError::Protocol {
325            description: "embedded request/reply requires an in-process responder backend"
326                .to_string(),
327        }))
328    }
329}
330
331/// Conversation handle that sends through direct in-process references.
332#[derive(Clone, Debug)]
333pub struct EmbeddedConversationHandle {
334    conversation_id: ConversationId,
335    backend: Arc<dyn EmbeddedConversationBackend>,
336}
337
338impl EmbeddedConversationHandle {
339    /// Creates an embedded conversation handle from direct in-process configuration.
340    #[must_use]
341    pub fn new(config: &EmbeddedConfig) -> Self {
342        Self {
343            conversation_id: config.conversation_id.clone(),
344            backend: Arc::clone(&config.conversation_backend),
345        }
346    }
347
348    /// Returns the application-visible conversation identifier.
349    #[must_use]
350    pub const fn conversation_id(&self) -> &ConversationId {
351        &self.conversation_id
352    }
353}
354
355impl ConversationHandle for EmbeddedConversationHandle {
356    type ReceiveFuture<'a, M>
357        = ReadyResult<M>
358    where
359        Self: 'a,
360        M: DeserializeOwned + 'a;
361
362    type LifecycleStream = EmptyLifecycleStream;
363
364    fn send<M>(&self, message: M) -> Result<(), SdkError>
365    where
366        M: Serialize,
367    {
368        self.backend.send(&message)
369    }
370
371    fn receive<M>(&self) -> ReadyResult<M>
372    where
373        M: DeserializeOwned,
374    {
375        ReadyResult::new(Err(SdkError::Conversation {
376            conversation_id: self.conversation_id.as_str().to_string(),
377            description: "embedded receive requires an in-process inbox backend".to_string(),
378        }))
379    }
380
381    fn lifecycle(&self) -> Self::LifecycleStream {
382        EmptyLifecycleStream
383    }
384}