use alloc::string::{String, ToString};
use alloc::sync::Arc;
use core::fmt;
use core::marker::PhantomData;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures_core::Stream;
use serde::Serialize;
use serde::de::DeserializeOwned;
use crate::{
ChannelHandle, ConversationEvent, ConversationHandle, ConversationId, PressureResponse,
SchemaMetadata, SchemaValidate, SdkError,
};
#[cfg(test)]
mod tests;
pub trait EmbeddedChannelMessage {
#[must_use]
fn schema_metadata(&self) -> SchemaMetadata;
#[must_use]
fn type_name(&self) -> &'static str;
}
impl<M> EmbeddedChannelMessage for M
where
M: Serialize + SchemaValidate,
{
fn schema_metadata(&self) -> SchemaMetadata {
M::schema_metadata()
}
fn type_name(&self) -> &'static str {
core::any::type_name::<M>()
}
}
pub trait EmbeddedConversationMessage {
#[must_use]
fn type_name(&self) -> &'static str;
}
impl<M> EmbeddedConversationMessage for M
where
M: Serialize,
{
fn type_name(&self) -> &'static str {
core::any::type_name::<M>()
}
}
pub struct SdkSubscription<M> {
pending_error: Option<SdkError>,
message: PhantomData<M>,
}
impl<M> SdkSubscription<M> {
#[must_use]
pub const fn empty() -> Self {
Self {
pending_error: None,
message: PhantomData,
}
}
#[must_use]
pub const fn error(error: SdkError) -> Self {
Self {
pending_error: Some(error),
message: PhantomData,
}
}
#[must_use]
pub const fn is_empty(&self) -> bool {
self.pending_error.is_none()
}
}
impl<M> Default for SdkSubscription<M> {
fn default() -> Self {
Self::empty()
}
}
impl<M> fmt::Debug for SdkSubscription<M> {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter
.debug_struct("SdkSubscription")
.field("has_pending_error", &self.pending_error.is_some())
.finish()
}
}
impl<M> Unpin for SdkSubscription<M> {}
impl<M> Stream for SdkSubscription<M> {
type Item = Result<M, SdkError>;
fn poll_next(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
core::hint::black_box(context.waker());
Poll::Ready(self.pending_error.take().map(Err))
}
}
#[derive(Clone, Debug, Default)]
pub struct EmptyLifecycleStream;
impl Unpin for EmptyLifecycleStream {}
impl Stream for EmptyLifecycleStream {
type Item = ConversationEvent;
fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
core::hint::black_box((self, context.waker()));
Poll::Ready(None)
}
}
pub struct ReadyResult<T> {
result: Option<Result<T, SdkError>>,
}
impl<T> fmt::Debug for ReadyResult<T> {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter
.debug_struct("ReadyResult")
.field("is_ready", &self.result.is_some())
.finish()
}
}
impl<T> ReadyResult<T> {
#[must_use]
pub const fn new(result: Result<T, SdkError>) -> Self {
Self {
result: Some(result),
}
}
}
impl<T> Unpin for ReadyResult<T> {}
impl<T> core::future::Future for ReadyResult<T> {
type Output = Result<T, SdkError>;
fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
core::hint::black_box(context.waker());
let Some(result) = self.result.take() else {
return Poll::Ready(Err(SdkError::Protocol {
description: "ready future polled after completion".to_string(),
}));
};
Poll::Ready(result)
}
}
pub trait EmbeddedChannelBackend: fmt::Debug + Send + Sync {
fn publish(&self, message: &dyn EmbeddedChannelMessage) -> Result<PressureResponse, SdkError>;
}
pub trait EmbeddedConversationBackend: fmt::Debug + Send + Sync {
fn send(&self, message: &dyn EmbeddedConversationMessage) -> Result<(), SdkError>;
}
#[derive(Clone, Debug, Default)]
pub struct DirectEmbeddedChannelBackend;
impl EmbeddedChannelBackend for DirectEmbeddedChannelBackend {
fn publish(&self, message: &dyn EmbeddedChannelMessage) -> Result<PressureResponse, SdkError> {
let schema = message.schema_metadata();
core::hint::black_box(&schema);
Ok(PressureResponse::Accept)
}
}
#[derive(Clone, Debug, Default)]
pub struct DirectEmbeddedConversationBackend;
impl EmbeddedConversationBackend for DirectEmbeddedConversationBackend {
fn send(&self, message: &dyn EmbeddedConversationMessage) -> Result<(), SdkError> {
let type_name = message.type_name();
core::hint::black_box(type_name);
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct EmbeddedConfig {
pub channel_name: String,
pub conversation_id: ConversationId,
pub channel_backend: Arc<dyn EmbeddedChannelBackend>,
pub conversation_backend: Arc<dyn EmbeddedConversationBackend>,
}
impl EmbeddedConfig {
#[must_use]
pub fn new(
channel_name: impl Into<String>,
conversation_id: impl Into<ConversationId>,
) -> Self {
Self {
channel_name: channel_name.into(),
conversation_id: conversation_id.into(),
channel_backend: Arc::new(DirectEmbeddedChannelBackend),
conversation_backend: Arc::new(DirectEmbeddedConversationBackend),
}
}
#[must_use]
pub fn with_channel_backend(mut self, backend: Arc<dyn EmbeddedChannelBackend>) -> Self {
self.channel_backend = backend;
self
}
#[must_use]
pub fn with_conversation_backend(
mut self,
backend: Arc<dyn EmbeddedConversationBackend>,
) -> Self {
self.conversation_backend = backend;
self
}
}
#[derive(Clone, Debug)]
pub struct EmbeddedChannelHandle {
channel_name: String,
backend: Arc<dyn EmbeddedChannelBackend>,
}
impl EmbeddedChannelHandle {
#[must_use]
pub fn new(config: &EmbeddedConfig) -> Self {
Self {
channel_name: config.channel_name.clone(),
backend: Arc::clone(&config.channel_backend),
}
}
#[must_use]
pub fn channel_name(&self) -> &str {
self.channel_name.as_str()
}
}
impl ChannelHandle for EmbeddedChannelHandle {
type Subscription<M>
= SdkSubscription<M>
where
M: DeserializeOwned;
type ReplyFuture<'a, Resp>
= ReadyResult<Resp>
where
Self: 'a,
Resp: DeserializeOwned + 'a;
fn publish<M>(&self, message: M) -> Result<PressureResponse, SdkError>
where
M: Serialize + SchemaValidate,
{
self.backend.publish(&message)
}
fn subscribe<M>(&self) -> Self::Subscription<M>
where
M: DeserializeOwned,
{
SdkSubscription::empty()
}
fn request_reply<Req, Resp>(&self, request: Req) -> ReadyResult<Resp>
where
Req: Serialize + SchemaValidate,
Resp: DeserializeOwned,
{
let schema = Req::schema_metadata();
core::hint::black_box((&request, &schema));
ReadyResult::new(Err(SdkError::Protocol {
description: "embedded request/reply requires an in-process responder backend"
.to_string(),
}))
}
}
#[derive(Clone, Debug)]
pub struct EmbeddedConversationHandle {
conversation_id: ConversationId,
backend: Arc<dyn EmbeddedConversationBackend>,
}
impl EmbeddedConversationHandle {
#[must_use]
pub fn new(config: &EmbeddedConfig) -> Self {
Self {
conversation_id: config.conversation_id.clone(),
backend: Arc::clone(&config.conversation_backend),
}
}
#[must_use]
pub const fn conversation_id(&self) -> &ConversationId {
&self.conversation_id
}
}
impl ConversationHandle for EmbeddedConversationHandle {
type ReceiveFuture<'a, M>
= ReadyResult<M>
where
Self: 'a,
M: DeserializeOwned + 'a;
type LifecycleStream = EmptyLifecycleStream;
fn send<M>(&self, message: M) -> Result<(), SdkError>
where
M: Serialize,
{
self.backend.send(&message)
}
fn receive<M>(&self) -> ReadyResult<M>
where
M: DeserializeOwned,
{
ReadyResult::new(Err(SdkError::Conversation {
conversation_id: self.conversation_id.as_str().to_string(),
description: "embedded receive requires an in-process inbox backend".to_string(),
}))
}
fn lifecycle(&self) -> Self::LifecycleStream {
EmptyLifecycleStream
}
}