use crate::channel::{Channel, ChannelRef};
use crate::{error::Result, Exchange};
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Clone, Debug)]
pub enum EndpointSource {
Http {
adapter_id: String,
method: String,
path: String,
},
Channel {
channel_id: String,
},
}
impl EndpointSource {
pub fn apply_headers(&self, exchange: &mut Exchange) {
match self {
EndpointSource::Http {
adapter_id,
method,
path,
} => {
if exchange.in_msg.header("source.kind").is_none() {
exchange.in_msg.set_header("source.kind", "http");
}
if exchange.in_msg.header("source.adapter_id").is_none() {
exchange.in_msg.set_header("source.adapter_id", adapter_id);
}
if exchange.in_msg.header("source.http.method").is_none() {
exchange.in_msg.set_header("source.http.method", method);
}
if exchange.in_msg.header("source.http.path").is_none() {
exchange.in_msg.set_header("source.http.path", path);
}
}
EndpointSource::Channel { channel_id } => {
if exchange.in_msg.header("source.kind").is_none() {
exchange.in_msg.set_header("source.kind", "channel");
}
if exchange.in_msg.header("source.channel_id").is_none() {
exchange.in_msg.set_header("source.channel_id", channel_id);
}
}
}
}
}
pub trait Endpoint: Send + Sync {
fn id(&self) -> &str;
fn send(&self, exchange: Exchange) -> impl std::future::Future<Output = Result<()>> + Send;
fn try_receive(&self) -> impl std::future::Future<Output = Option<Exchange>> + Send;
}
pub struct EndpointBuilder;
impl EndpointBuilder {
pub fn in_out() -> InOutStage {
InOutStage
}
pub fn in_only() -> InOnlyStage {
InOnlyStage
}
}
pub struct InOutStage;
pub struct InOnlyStage;
impl InOutStage {
pub fn queue(self) -> InOutQueueEndpointBuilder {
InOutQueueEndpointBuilder {
id: None,
source: None,
channel: None,
}
}
}
impl InOnlyStage {
pub fn queue(self) -> InOnlyInMemoryEndpointBuilder {
InOnlyInMemoryEndpointBuilder {
id: None,
source: None,
}
}
}
pub struct InOutQueueEndpointBuilder {
id: Option<String>,
source: Option<EndpointSource>,
channel: Option<ChannelRef>,
}
impl InOutQueueEndpointBuilder {
pub fn id(mut self, id: impl Into<String>) -> Self {
self.id = Some(id.into());
self
}
pub fn channel(mut self, ch: ChannelRef) -> Self {
self.channel = Some(ch);
self
}
pub fn source(mut self, src: EndpointSource) -> Self {
self.source = Some(src);
self
}
pub fn source_http(
self,
_adapter: &Arc<()>,
_method: impl Into<String>,
_path: impl Into<String>,
) -> Self {
self
}
pub fn source_channel<T: Channel + 'static>(mut self, channel: &Arc<T>) -> Self {
self.source = Some(EndpointSource::Channel {
channel_id: channel.id().to_string(),
});
self.channel = Some(channel.clone());
self
}
pub fn build(self) -> Arc<InMemoryEndpoint> {
let ep = match self.id {
Some(id) => Arc::new(InMemoryEndpoint::with_id_and_source(
id,
self.source.clone(),
self.channel.clone(),
)),
None => Arc::new(InMemoryEndpoint::new_with_source(
self.source.clone(),
self.channel.clone(),
)),
};
ep
}
}
pub struct InOnlyInMemoryEndpointBuilder {
id: Option<String>,
source: Option<EndpointSource>,
}
impl InOnlyInMemoryEndpointBuilder {
pub fn id(mut self, id: impl Into<String>) -> Self {
self.id = Some(id.into());
self
}
pub fn source_http(
self,
_adapter: &Arc<()>,
_method: impl Into<String>,
_path: impl Into<String>,
) -> Self {
self
}
pub fn source_channel<T: Channel + 'static>(mut self, channel: &Arc<T>) -> Self {
self.source = Some(EndpointSource::Channel {
channel_id: channel.id().to_string(),
});
self
}
pub fn build(self) -> Arc<InMemoryInOnlyEndpoint> {
let id = self
.id
.unwrap_or_else(|| format!("endpoint:{}", uuid::Uuid::new_v4()));
let ep = Arc::new(InMemoryInOnlyEndpoint {
id,
inner: std::sync::Arc::new(Mutex::new(VecDeque::new())),
source: self.source,
});
ep
}
}
#[derive(Clone, Default)]
pub struct InMemoryEndpoint {
id: String,
inner: Arc<Mutex<VecDeque<Exchange>>>,
source: Option<EndpointSource>,
channel: Option<ChannelRef>,
}
impl InMemoryEndpoint {
#[allow(dead_code)]
pub(crate) fn new() -> Self {
Self {
id: format!("endpoint:{}", uuid::Uuid::new_v4()),
inner: Arc::new(Mutex::new(VecDeque::new())),
source: None,
channel: None,
}
}
pub(crate) fn new_with_source(
source: Option<EndpointSource>,
channel: Option<ChannelRef>,
) -> Self {
Self {
id: format!("endpoint:{}", uuid::Uuid::new_v4()),
inner: Arc::new(Mutex::new(VecDeque::new())),
source,
channel,
}
}
#[allow(dead_code)]
pub(crate) fn with_id<S: Into<String>>(id: S) -> Self {
Self {
id: id.into(),
inner: Arc::new(Mutex::new(VecDeque::new())),
source: None,
channel: None,
}
}
#[allow(dead_code)]
pub(crate) fn with_id_and_source<S: Into<String>>(
id: S,
source: Option<EndpointSource>,
channel: Option<ChannelRef>,
) -> Self {
Self {
id: id.into(),
inner: Arc::new(Mutex::new(VecDeque::new())),
source,
channel,
}
}
pub fn id(&self) -> &str {
&self.id
}
pub fn source(&self) -> Option<&EndpointSource> {
self.source.as_ref()
}
pub fn channel(&self) -> Option<&ChannelRef> {
self.channel.as_ref()
}
}
impl Endpoint for InMemoryEndpoint {
fn id(&self) -> &str {
&self.id
}
fn send(&self, mut exchange: Exchange) -> impl std::future::Future<Output = Result<()>> + Send {
async move {
if let Some(src) = &self.source {
src.apply_headers(&mut exchange);
}
let mut guard = self.inner.lock().await;
guard.push_back(exchange);
Ok(())
}
}
fn try_receive(&self) -> impl std::future::Future<Output = Option<Exchange>> + Send {
async move {
let mut guard = self.inner.lock().await;
guard.pop_front()
}
}
}
#[derive(Clone, Default)]
pub struct InMemoryInOnlyEndpoint {
id: String,
inner: Arc<Mutex<VecDeque<Exchange>>>,
source: Option<EndpointSource>,
}
impl InMemoryInOnlyEndpoint {
pub fn id(&self) -> &str {
&self.id
}
}
impl Endpoint for InMemoryInOnlyEndpoint {
fn id(&self) -> &str {
&self.id
}
fn send(&self, mut exchange: Exchange) -> impl std::future::Future<Output = Result<()>> + Send {
async move {
if let Some(src) = &self.source {
src.apply_headers(&mut exchange);
}
let mut g = self.inner.lock().await;
g.push_back(exchange);
Ok(())
}
}
fn try_receive(&self) -> impl std::future::Future<Output = Option<Exchange>> + Send {
async move { None }
}
}