1use alloc::string::{String, ToString};
2use alloc::sync::Arc;
3use core::fmt;
4use core::marker::PhantomData;
5use core::pin::Pin;
6use core::task::{Context, Poll};
7
8use futures_core::Stream;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11
12use crate::{
13 ChannelHandle, ConversationEvent, ConversationHandle, ConversationId, PressureResponse,
14 SchemaMetadata, SchemaValidate, SdkError,
15};
16
17#[cfg(test)]
18mod tests;
19
20pub trait EmbeddedChannelMessage {
25 #[must_use]
27 fn schema_metadata(&self) -> SchemaMetadata;
28
29 #[must_use]
31 fn type_name(&self) -> &'static str;
32}
33
34impl<M> EmbeddedChannelMessage for M
35where
36 M: Serialize + SchemaValidate,
37{
38 fn schema_metadata(&self) -> SchemaMetadata {
39 M::schema_metadata()
40 }
41
42 fn type_name(&self) -> &'static str {
43 core::any::type_name::<M>()
44 }
45}
46
47pub trait EmbeddedConversationMessage {
49 #[must_use]
51 fn type_name(&self) -> &'static str;
52}
53
54impl<M> EmbeddedConversationMessage for M
55where
56 M: Serialize,
57{
58 fn type_name(&self) -> &'static str {
59 core::any::type_name::<M>()
60 }
61}
62
63pub struct SdkSubscription<M> {
65 pending_error: Option<SdkError>,
66 message: PhantomData<M>,
67}
68
69impl<M> SdkSubscription<M> {
70 #[must_use]
72 pub const fn empty() -> Self {
73 Self {
74 pending_error: None,
75 message: PhantomData,
76 }
77 }
78
79 #[must_use]
81 pub const fn error(error: SdkError) -> Self {
82 Self {
83 pending_error: Some(error),
84 message: PhantomData,
85 }
86 }
87
88 #[must_use]
90 pub const fn is_empty(&self) -> bool {
91 self.pending_error.is_none()
92 }
93}
94
95impl<M> Default for SdkSubscription<M> {
96 fn default() -> Self {
97 Self::empty()
98 }
99}
100
101impl<M> fmt::Debug for SdkSubscription<M> {
102 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
103 formatter
104 .debug_struct("SdkSubscription")
105 .field("has_pending_error", &self.pending_error.is_some())
106 .finish()
107 }
108}
109
110impl<M> Unpin for SdkSubscription<M> {}
111
112impl<M> Stream for SdkSubscription<M> {
113 type Item = Result<M, SdkError>;
114
115 fn poll_next(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
116 core::hint::black_box(context.waker());
117 Poll::Ready(self.pending_error.take().map(Err))
118 }
119}
120
121#[derive(Clone, Debug, Default)]
123pub struct EmptyLifecycleStream;
124
125impl Unpin for EmptyLifecycleStream {}
126
127impl Stream for EmptyLifecycleStream {
128 type Item = ConversationEvent;
129
130 fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
131 core::hint::black_box((self, context.waker()));
132 Poll::Ready(None)
133 }
134}
135
136pub struct ReadyResult<T> {
138 result: Option<Result<T, SdkError>>,
139}
140
141impl<T> fmt::Debug for ReadyResult<T> {
142 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
143 formatter
144 .debug_struct("ReadyResult")
145 .field("is_ready", &self.result.is_some())
146 .finish()
147 }
148}
149
150impl<T> ReadyResult<T> {
151 #[must_use]
153 pub const fn new(result: Result<T, SdkError>) -> Self {
154 Self {
155 result: Some(result),
156 }
157 }
158}
159
160impl<T> Unpin for ReadyResult<T> {}
161
162impl<T> core::future::Future for ReadyResult<T> {
163 type Output = Result<T, SdkError>;
164
165 fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
166 core::hint::black_box(context.waker());
167 let Some(result) = self.result.take() else {
168 return Poll::Ready(Err(SdkError::Protocol {
169 description: "ready future polled after completion".to_string(),
170 }));
171 };
172
173 Poll::Ready(result)
174 }
175}
176
177pub trait EmbeddedChannelBackend: fmt::Debug + Send + Sync {
179 fn publish(&self, message: &dyn EmbeddedChannelMessage) -> Result<PressureResponse, SdkError>;
185}
186
187pub trait EmbeddedConversationBackend: fmt::Debug + Send + Sync {
189 fn send(&self, message: &dyn EmbeddedConversationMessage) -> Result<(), SdkError>;
195}
196
197#[derive(Clone, Debug, Default)]
199pub struct DirectEmbeddedChannelBackend;
200
201impl EmbeddedChannelBackend for DirectEmbeddedChannelBackend {
202 fn publish(&self, message: &dyn EmbeddedChannelMessage) -> Result<PressureResponse, SdkError> {
203 let schema = message.schema_metadata();
204 core::hint::black_box(&schema);
205 Ok(PressureResponse::Accept)
206 }
207}
208
209#[derive(Clone, Debug, Default)]
211pub struct DirectEmbeddedConversationBackend;
212
213impl EmbeddedConversationBackend for DirectEmbeddedConversationBackend {
214 fn send(&self, message: &dyn EmbeddedConversationMessage) -> Result<(), SdkError> {
215 let type_name = message.type_name();
216 core::hint::black_box(type_name);
217 Ok(())
218 }
219}
220
221#[derive(Clone, Debug)]
223pub struct EmbeddedConfig {
224 pub channel_name: String,
226 pub conversation_id: ConversationId,
228 pub channel_backend: Arc<dyn EmbeddedChannelBackend>,
230 pub conversation_backend: Arc<dyn EmbeddedConversationBackend>,
232}
233
234impl EmbeddedConfig {
235 #[must_use]
237 pub fn new(
238 channel_name: impl Into<String>,
239 conversation_id: impl Into<ConversationId>,
240 ) -> Self {
241 Self {
242 channel_name: channel_name.into(),
243 conversation_id: conversation_id.into(),
244 channel_backend: Arc::new(DirectEmbeddedChannelBackend),
245 conversation_backend: Arc::new(DirectEmbeddedConversationBackend),
246 }
247 }
248
249 #[must_use]
251 pub fn with_channel_backend(mut self, backend: Arc<dyn EmbeddedChannelBackend>) -> Self {
252 self.channel_backend = backend;
253 self
254 }
255
256 #[must_use]
258 pub fn with_conversation_backend(
259 mut self,
260 backend: Arc<dyn EmbeddedConversationBackend>,
261 ) -> Self {
262 self.conversation_backend = backend;
263 self
264 }
265}
266
267#[derive(Clone, Debug)]
269pub struct EmbeddedChannelHandle {
270 channel_name: String,
271 backend: Arc<dyn EmbeddedChannelBackend>,
272}
273
274impl EmbeddedChannelHandle {
275 #[must_use]
277 pub fn new(config: &EmbeddedConfig) -> Self {
278 Self {
279 channel_name: config.channel_name.clone(),
280 backend: Arc::clone(&config.channel_backend),
281 }
282 }
283
284 #[must_use]
286 pub fn channel_name(&self) -> &str {
287 self.channel_name.as_str()
288 }
289}
290
291impl ChannelHandle for EmbeddedChannelHandle {
292 type Subscription<M>
293 = SdkSubscription<M>
294 where
295 M: DeserializeOwned;
296
297 type ReplyFuture<'a, Resp>
298 = ReadyResult<Resp>
299 where
300 Self: 'a,
301 Resp: DeserializeOwned + 'a;
302
303 fn publish<M>(&self, message: M) -> Result<PressureResponse, SdkError>
304 where
305 M: Serialize + SchemaValidate,
306 {
307 self.backend.publish(&message)
308 }
309
310 fn subscribe<M>(&self) -> Self::Subscription<M>
311 where
312 M: DeserializeOwned,
313 {
314 SdkSubscription::empty()
315 }
316
317 fn request_reply<Req, Resp>(&self, request: Req) -> ReadyResult<Resp>
318 where
319 Req: Serialize + SchemaValidate,
320 Resp: DeserializeOwned,
321 {
322 let schema = Req::schema_metadata();
323 core::hint::black_box((&request, &schema));
324 ReadyResult::new(Err(SdkError::Protocol {
325 description: "embedded request/reply requires an in-process responder backend"
326 .to_string(),
327 }))
328 }
329}
330
331#[derive(Clone, Debug)]
333pub struct EmbeddedConversationHandle {
334 conversation_id: ConversationId,
335 backend: Arc<dyn EmbeddedConversationBackend>,
336}
337
338impl EmbeddedConversationHandle {
339 #[must_use]
341 pub fn new(config: &EmbeddedConfig) -> Self {
342 Self {
343 conversation_id: config.conversation_id.clone(),
344 backend: Arc::clone(&config.conversation_backend),
345 }
346 }
347
348 #[must_use]
350 pub const fn conversation_id(&self) -> &ConversationId {
351 &self.conversation_id
352 }
353}
354
355impl ConversationHandle for EmbeddedConversationHandle {
356 type ReceiveFuture<'a, M>
357 = ReadyResult<M>
358 where
359 Self: 'a,
360 M: DeserializeOwned + 'a;
361
362 type LifecycleStream = EmptyLifecycleStream;
363
364 fn send<M>(&self, message: M) -> Result<(), SdkError>
365 where
366 M: Serialize,
367 {
368 self.backend.send(&message)
369 }
370
371 fn receive<M>(&self) -> ReadyResult<M>
372 where
373 M: DeserializeOwned,
374 {
375 ReadyResult::new(Err(SdkError::Conversation {
376 conversation_id: self.conversation_id.as_str().to_string(),
377 description: "embedded receive requires an in-process inbox backend".to_string(),
378 }))
379 }
380
381 fn lifecycle(&self) -> Self::LifecycleStream {
382 EmptyLifecycleStream
383 }
384}