1use alloc::string::{String, ToString};
2use alloc::sync::Arc;
3use alloc::vec::Vec;
4use core::time::Duration;
5
6use serde::Serialize;
7use serde::de::DeserializeOwned;
8use spin::Mutex;
9
10use crate::connection::{
11 ConnectionEvent, ConnectionLifecycle, ConnectionPool, ConnectionState, ReconnectJitter,
12 ResumeRequest, SubscriptionId,
13};
14use crate::embedded::{
15 EmbeddedChannelHandle, EmbeddedConversationHandle, EmptyLifecycleStream, ReadyResult,
16 SdkSubscription,
17};
18use crate::{
19 ChannelHandle, ConversationHandle, ConversationId, DeliveryAck, PressureResponse,
20 SchemaValidate, SdkError,
21};
22
23use super::config::SdkConfig;
24use super::protocol::{
25 RemoteTransport, WireConversationRequest, WirePublishRequest, WireResumeRequest,
26 WireSubscribeRequest, deserialize_payload,
27};
28use super::{RemoteConfig, ServerAddress, connection_error};
29
30#[derive(Debug)]
31struct RemoteChannelState {
32 lifecycle: ConnectionLifecycle,
33 pool: ConnectionPool,
34 next_subscription: u64,
35}
36
37#[derive(Clone, Debug)]
39pub struct RemoteChannelHandle {
40 server_address: ServerAddress,
41 channel_name: String,
42 state: Arc<Mutex<RemoteChannelState>>,
43 transport: Arc<dyn RemoteTransport>,
44}
45
46impl RemoteChannelHandle {
47 pub fn new(config: &RemoteConfig) -> Result<Self, SdkError> {
53 Ok(Self {
54 server_address: config.server_address.clone(),
55 channel_name: config.channel_name.clone(),
56 state: Arc::new(Mutex::new(RemoteChannelState {
57 lifecycle: ConnectionLifecycle::new(config.reconnect_config),
58 pool: ConnectionPool::new(config.pool_config, config.reconnect_config)?,
59 next_subscription: 0,
60 })),
61 transport: Arc::clone(&config.transport),
62 })
63 }
64
65 #[must_use]
67 pub fn connection_state(&self) -> ConnectionState {
68 self.state.lock().lifecycle.state().clone()
69 }
70
71 pub fn reconnect<J>(&self, jitter: &mut J) -> Result<Duration, SdkError>
77 where
78 J: ReconnectJitter + ?Sized,
79 {
80 self.state.lock().lifecycle.reconnect(jitter)
81 }
82
83 pub fn connected(&self) -> Result<Vec<ResumeRequest>, SdkError> {
89 let requests = {
92 let mut state = self.state.lock();
93 let previous = state.lifecycle.state().clone();
94 state.lifecycle.connected()?;
95 let event = ConnectionEvent::new(previous, state.lifecycle.state().clone());
96 state.pool.resume_requests_for_transition(&event)?
97 };
98 for request in &requests {
99 let wire_request = WireResumeRequest::new(*request);
100 self.transport.resume(&self.server_address, &wire_request)?;
101 }
102 Ok(requests)
103 }
104
105 pub fn track_subscription(&self) -> Result<SubscriptionId, SdkError> {
111 let mut state = self.state.lock();
112 let id = SubscriptionId::new(state.next_subscription);
113 state.next_subscription =
114 state
115 .next_subscription
116 .checked_add(1)
117 .ok_or_else(|| SdkError::Store {
118 description: "subscription id overflow".to_string(),
119 })?;
120 state.pool.assign_subscription(id)?;
121 Ok(id)
122 }
123
124 pub fn acknowledge(
130 &self,
131 subscription_id: SubscriptionId,
132 sequence: u64,
133 ) -> Result<(), SdkError> {
134 let mut state = self.state.lock();
135 state.pool.acknowledge(subscription_id, sequence)
136 }
137
138 #[must_use]
140 pub const fn server_address(&self) -> &ServerAddress {
141 &self.server_address
142 }
143}
144
145impl ChannelHandle for RemoteChannelHandle {
146 type Subscription<M>
147 = SdkSubscription<M>
148 where
149 M: DeserializeOwned;
150
151 type ReplyFuture<'a, Resp>
152 = ReadyResult<Resp>
153 where
154 Self: 'a,
155 Resp: DeserializeOwned + 'a;
156
157 fn publish<M>(&self, message: M) -> Result<PressureResponse, SdkError>
158 where
159 M: Serialize + SchemaValidate,
160 {
161 let request = WirePublishRequest::new(&self.channel_name, &message)?;
162 self.transport.publish(&self.server_address, &request)
163 }
164
165 fn subscribe<M>(&self) -> Self::Subscription<M>
166 where
167 M: DeserializeOwned,
168 {
169 let outcome = self.track_subscription().and_then(|subscription_id| {
170 let connection_id = self
171 .state
172 .lock()
173 .pool
174 .connection_for_subscription(subscription_id)
175 .ok_or_else(|| connection_error("subscription was not assigned to the pool"))?;
176 let request = WireSubscribeRequest::new(
177 &self.channel_name,
178 subscription_id,
179 connection_id.get(),
180 )?;
181 self.transport.subscribe(&self.server_address, &request)
182 });
183
184 match outcome {
185 Ok(()) => SdkSubscription::empty(),
186 Err(error) => SdkSubscription::error(error),
187 }
188 }
189
190 fn request_reply<Req, Resp>(&self, request: Req) -> ReadyResult<Resp>
191 where
192 Req: Serialize + SchemaValidate,
193 Resp: DeserializeOwned,
194 {
195 ReadyResult::new(self.request_reply_blocking(&request))
196 }
197}
198
199impl RemoteChannelHandle {
200 pub fn publish_with_idempotency_key<M>(
215 &self,
216 message: &M,
217 idempotency_key: &str,
218 ) -> Result<DeliveryAck, SdkError>
219 where
220 M: Serialize + SchemaValidate,
221 {
222 let request =
223 WirePublishRequest::with_idempotency_key(&self.channel_name, message, idempotency_key)?;
224 self.transport
225 .publish_with_delivery(&self.server_address, &request)
226 }
227
228 fn request_reply_blocking<Req, Resp>(&self, request: &Req) -> Result<Resp, SdkError>
242 where
243 Req: Serialize + SchemaValidate,
244 Resp: DeserializeOwned,
245 {
246 let conversation_id = ConversationId::new(self.channel_name.clone());
247 let wire_request = WireConversationRequest::new(&conversation_id, request)?;
248 let reply = self
249 .transport
250 .request_reply_conversation(&self.server_address, &wire_request)?;
251 deserialize_payload(&reply)
252 }
253}
254
255#[derive(Clone, Debug)]
257pub struct RemoteConversationHandle {
258 server_address: ServerAddress,
259 conversation_id: ConversationId,
260 lifecycle: Arc<Mutex<ConnectionLifecycle>>,
261 transport: Arc<dyn RemoteTransport>,
262 pending_reply: Arc<Mutex<Option<Vec<u8>>>>,
267}
268
269impl RemoteConversationHandle {
270 #[must_use]
272 pub fn new(config: &RemoteConfig) -> Self {
273 Self {
274 server_address: config.server_address.clone(),
275 conversation_id: config.conversation_id.clone(),
276 lifecycle: Arc::new(Mutex::new(ConnectionLifecycle::new(
277 config.reconnect_config,
278 ))),
279 transport: Arc::clone(&config.transport),
280 pending_reply: Arc::new(Mutex::new(None)),
281 }
282 }
283
284 pub fn request<Req>(&self, request: Req) -> Result<(), SdkError>
299 where
300 Req: Serialize,
301 {
302 let wire_request = WireConversationRequest::new(&self.conversation_id, &request)?;
303 let reply = self
304 .transport
305 .request_reply_conversation(&self.server_address, &wire_request)?;
306 *self.pending_reply.lock() = Some(reply);
307 Ok(())
308 }
309
310 #[must_use]
312 pub fn connection_state(&self) -> ConnectionState {
313 self.lifecycle.lock().state().clone()
314 }
315
316 #[must_use]
318 pub const fn server_address(&self) -> &ServerAddress {
319 &self.server_address
320 }
321}
322
323impl ConversationHandle for RemoteConversationHandle {
324 type ReceiveFuture<'a, M>
325 = ReadyResult<M>
326 where
327 Self: 'a,
328 M: DeserializeOwned + 'a;
329
330 type LifecycleStream = EmptyLifecycleStream;
331
332 fn send<M>(&self, message: M) -> Result<(), SdkError>
333 where
334 M: Serialize,
335 {
336 let request = WireConversationRequest::new(&self.conversation_id, &message)?;
337 self.transport
338 .send_conversation(&self.server_address, &request)
339 }
340
341 fn receive<M>(&self) -> ReadyResult<M>
342 where
343 M: DeserializeOwned,
344 {
345 ReadyResult::new(self.receive_blocking())
346 }
347
348 fn lifecycle(&self) -> Self::LifecycleStream {
349 EmptyLifecycleStream
350 }
351}
352
353impl RemoteConversationHandle {
354 fn receive_blocking<M>(&self) -> Result<M, SdkError>
363 where
364 M: DeserializeOwned,
365 {
366 let payload = self
367 .pending_reply
368 .lock()
369 .take()
370 .ok_or_else(|| SdkError::Conversation {
371 conversation_id: self.conversation_id.as_str().to_string(),
372 description: "no correlated reply is pending; call request before receive"
373 .to_string(),
374 })?;
375 deserialize_payload(&payload)
376 }
377}
378
379#[derive(Clone, Debug)]
381pub enum SdkChannelHandle {
382 Embedded(EmbeddedChannelHandle),
384 Remote(RemoteChannelHandle),
386}
387
388impl SdkChannelHandle {
389 pub fn new(config: &SdkConfig) -> Result<Self, SdkError> {
395 match config {
396 SdkConfig::Embedded(config) => Ok(Self::Embedded(EmbeddedChannelHandle::new(config))),
397 SdkConfig::Remote(config) => Ok(Self::Remote(RemoteChannelHandle::new(config)?)),
398 }
399 }
400}
401
402impl ChannelHandle for SdkChannelHandle {
403 type Subscription<M>
404 = SdkSubscription<M>
405 where
406 M: DeserializeOwned;
407
408 type ReplyFuture<'a, Resp>
409 = ReadyResult<Resp>
410 where
411 Self: 'a,
412 Resp: DeserializeOwned + 'a;
413
414 fn publish<M>(&self, message: M) -> Result<PressureResponse, SdkError>
415 where
416 M: Serialize + SchemaValidate,
417 {
418 match self {
419 Self::Embedded(handle) => handle.publish(message),
420 Self::Remote(handle) => handle.publish(message),
421 }
422 }
423
424 fn subscribe<M>(&self) -> Self::Subscription<M>
425 where
426 M: DeserializeOwned,
427 {
428 match self {
429 Self::Embedded(handle) => handle.subscribe(),
430 Self::Remote(handle) => handle.subscribe(),
431 }
432 }
433
434 fn request_reply<Req, Resp>(&self, request: Req) -> ReadyResult<Resp>
435 where
436 Req: Serialize + SchemaValidate,
437 Resp: DeserializeOwned,
438 {
439 match self {
440 Self::Embedded(handle) => handle.request_reply(request),
441 Self::Remote(handle) => handle.request_reply(request),
442 }
443 }
444}
445
446#[derive(Clone, Debug)]
448pub enum SdkConversationHandle {
449 Embedded(EmbeddedConversationHandle),
451 Remote(RemoteConversationHandle),
453}
454
455impl SdkConversationHandle {
456 pub fn new(config: &SdkConfig) -> Result<Self, SdkError> {
462 match config {
463 SdkConfig::Embedded(config) => {
464 Ok(Self::Embedded(EmbeddedConversationHandle::new(config)))
465 }
466 SdkConfig::Remote(config) => Ok(Self::Remote(RemoteConversationHandle::new(config))),
467 }
468 }
469}
470
471impl ConversationHandle for SdkConversationHandle {
472 type ReceiveFuture<'a, M>
473 = ReadyResult<M>
474 where
475 Self: 'a,
476 M: DeserializeOwned + 'a;
477
478 type LifecycleStream = EmptyLifecycleStream;
479
480 fn send<M>(&self, message: M) -> Result<(), SdkError>
481 where
482 M: Serialize,
483 {
484 match self {
485 Self::Embedded(handle) => handle.send(message),
486 Self::Remote(handle) => handle.send(message),
487 }
488 }
489
490 fn receive<M>(&self) -> ReadyResult<M>
491 where
492 M: DeserializeOwned,
493 {
494 match self {
495 Self::Embedded(handle) => handle.receive(),
496 Self::Remote(handle) => handle.receive(),
497 }
498 }
499
500 fn lifecycle(&self) -> Self::LifecycleStream {
501 EmptyLifecycleStream
502 }
503}