allora_core/
endpoint.rs

1//! Endpoint abstraction: a lightweight FIFO inbox for Exchanges.
2//!
3//! # Purpose
4//! `Endpoint` represents a minimal buffering component for messages (`Exchange`) without
5//! applying routing or processing logic. It is useful for:
6//! * Simple test harnesses (inject messages, assert ordering).
7//! * Staging / decoupling between an inbound adapter and a downstream `Channel`.
8//! * Capturing outputs in integration tests when full routing is unnecessary.
9//!
10//! For richer semantics (processors, correlation, queues of processed results) prefer
11//! the `Channel` abstraction.
12//!
13//! # Object Safety Note
14//! The trait returns `impl Future` for async methods, which makes it not object-safe.
15//! That is acceptable for current usage (direct generic or concrete types). If you need
16//! dynamic dispatch (`Box<dyn Endpoint>`), refactor to use `async_trait` instead.
17//!
18//! # Example
19//! ```rust
20//! use allora_core::{Exchange, Message};
21//! use allora_core::endpoint::{Endpoint, EndpointBuilder};
22//! let ep = EndpointBuilder::in_out().queue().build();
23//! let rt = tokio::runtime::Runtime::new().unwrap();
24//! rt.block_on(async {
25//!     ep.send(Exchange::new(Message::from_text("A"))).await.unwrap();
26//!     let received = ep.try_receive().await.unwrap();
27//!     assert_eq!(received.in_msg.body_text(), Some("A"));
28//! });
29//! ```
30
31use crate::channel::{Channel, ChannelRef};
32use crate::{error::Result, Exchange};
33use std::collections::VecDeque;
34use std::sync::Arc;
35use tokio::sync::Mutex;
36
37/// Source metadata describing origin of messages entering an endpoint.
38#[derive(Clone, Debug)]
39pub enum EndpointSource {
40    Http {
41        adapter_id: String,
42        method: String,
43        path: String,
44    },
45    Channel {
46        channel_id: String,
47    },
48}
49impl EndpointSource {
50    pub fn apply_headers(&self, exchange: &mut Exchange) {
51        match self {
52            EndpointSource::Http {
53                adapter_id,
54                method,
55                path,
56            } => {
57                if exchange.in_msg.header("source.kind").is_none() {
58                    exchange.in_msg.set_header("source.kind", "http");
59                }
60                if exchange.in_msg.header("source.adapter_id").is_none() {
61                    exchange.in_msg.set_header("source.adapter_id", adapter_id);
62                }
63                if exchange.in_msg.header("source.http.method").is_none() {
64                    exchange.in_msg.set_header("source.http.method", method);
65                }
66                if exchange.in_msg.header("source.http.path").is_none() {
67                    exchange.in_msg.set_header("source.http.path", path);
68                }
69            }
70            EndpointSource::Channel { channel_id } => {
71                if exchange.in_msg.header("source.kind").is_none() {
72                    exchange.in_msg.set_header("source.kind", "channel");
73                }
74                if exchange.in_msg.header("source.channel_id").is_none() {
75                    exchange.in_msg.set_header("source.channel_id", channel_id);
76                }
77            }
78        }
79    }
80}
81
82/// A trait representing a message endpoint for sending and receiving [`Exchange`] objects.
83pub trait Endpoint: Send + Sync {
84    /// Stable identifier for this endpoint instance.
85    fn id(&self) -> &str;
86    /// Enqueue of an Exchange.
87    fn send(&self, exchange: Exchange) -> impl std::future::Future<Output = Result<()>> + Send;
88    /// Non-blocking dequeue of next Exchange.
89    fn try_receive(&self) -> impl std::future::Future<Output = Option<Exchange>> + Send;
90}
91
92/// Staged builder root for endpoints.
93pub struct EndpointBuilder;
94impl EndpointBuilder {
95    pub fn in_out() -> InOutStage {
96        InOutStage
97    }
98    pub fn in_only() -> InOnlyStage {
99        InOnlyStage
100    }
101}
102pub struct InOutStage;
103pub struct InOnlyStage;
104impl InOutStage {
105    pub fn queue(self) -> InOutQueueEndpointBuilder {
106        InOutQueueEndpointBuilder {
107            id: None,
108            source: None,
109            channel: None,
110        }
111    }
112}
113impl InOnlyStage {
114    pub fn queue(self) -> InOnlyInMemoryEndpointBuilder {
115        InOnlyInMemoryEndpointBuilder {
116            id: None,
117            source: None,
118        }
119    }
120}
121/// Builder for in-out (send + receive) in-memory endpoint.
122pub struct InOutQueueEndpointBuilder {
123    id: Option<String>,
124    source: Option<EndpointSource>,
125    channel: Option<ChannelRef>,
126}
127impl InOutQueueEndpointBuilder {
128    pub fn id(mut self, id: impl Into<String>) -> Self {
129        self.id = Some(id.into());
130        self
131    }
132    pub fn channel(mut self, ch: ChannelRef) -> Self {
133        self.channel = Some(ch);
134        self
135    }
136
137    /// Directly set a source metadata descriptor.
138    pub fn source(mut self, src: EndpointSource) -> Self {
139        self.source = Some(src);
140        self
141    }
142    pub fn source_http(
143        self,
144        _adapter: &Arc<()>,
145        _method: impl Into<String>,
146        _path: impl Into<String>,
147    ) -> Self {
148        self
149    }
150    pub fn source_channel<T: Channel + 'static>(mut self, channel: &Arc<T>) -> Self {
151        self.source = Some(EndpointSource::Channel {
152            channel_id: channel.id().to_string(),
153        });
154        // implicit unsize coercion Arc<T> -> Arc<dyn Channel>
155        self.channel = Some(channel.clone());
156        self
157    }
158    pub fn build(self) -> Arc<InMemoryEndpoint> {
159        let ep = match self.id {
160            Some(id) => Arc::new(InMemoryEndpoint::with_id_and_source(
161                id,
162                self.source.clone(),
163                self.channel.clone(),
164            )),
165            None => Arc::new(InMemoryEndpoint::new_with_source(
166                self.source.clone(),
167                self.channel.clone(),
168            )),
169        };
170        ep
171    }
172}
173/// Builder for in-only (send only) in-memory endpoint.
174pub struct InOnlyInMemoryEndpointBuilder {
175    id: Option<String>,
176    source: Option<EndpointSource>,
177}
178impl InOnlyInMemoryEndpointBuilder {
179    pub fn id(mut self, id: impl Into<String>) -> Self {
180        self.id = Some(id.into());
181        self
182    }
183    pub fn source_http(
184        self,
185        _adapter: &Arc<()>,
186        _method: impl Into<String>,
187        _path: impl Into<String>,
188    ) -> Self {
189        self
190    }
191    pub fn source_channel<T: Channel + 'static>(mut self, channel: &Arc<T>) -> Self {
192        self.source = Some(EndpointSource::Channel {
193            channel_id: channel.id().to_string(),
194        });
195        self
196    }
197    pub fn build(self) -> Arc<InMemoryInOnlyEndpoint> {
198        let id = self
199            .id
200            .unwrap_or_else(|| format!("endpoint:{}", uuid::Uuid::new_v4()));
201        let ep = Arc::new(InMemoryInOnlyEndpoint {
202            id,
203            inner: std::sync::Arc::new(Mutex::new(VecDeque::new())),
204            source: self.source,
205        });
206        ep
207    }
208}
209
210/// An in-memory FIFO endpoint for quick testing.
211///
212/// Internally uses a `VecDeque` protected by a mutex (`std::sync::Mutex` or
213/// `tokio::sync::Mutex` for async mode). No backpressure, size limits, or correlation
214/// semantics are provided—this is intentionally minimal.
215#[derive(Clone, Default)]
216pub struct InMemoryEndpoint {
217    id: String,
218    inner: Arc<Mutex<VecDeque<Exchange>>>,
219    source: Option<EndpointSource>,
220    channel: Option<ChannelRef>,
221}
222
223impl InMemoryEndpoint {
224    /// Create a new empty endpoint (crate-private; use builder).
225    #[allow(dead_code)]
226    pub(crate) fn new() -> Self {
227        Self {
228            id: format!("endpoint:{}", uuid::Uuid::new_v4()),
229            inner: Arc::new(Mutex::new(VecDeque::new())),
230            source: None,
231            channel: None,
232        }
233    }
234    pub(crate) fn new_with_source(
235        source: Option<EndpointSource>,
236        channel: Option<ChannelRef>,
237    ) -> Self {
238        Self {
239            id: format!("endpoint:{}", uuid::Uuid::new_v4()),
240            inner: Arc::new(Mutex::new(VecDeque::new())),
241            source,
242            channel,
243        }
244    }
245    /// Create a new empty endpoint with a custom ID (crate-private; use builder).
246    #[allow(dead_code)]
247    pub(crate) fn with_id<S: Into<String>>(id: S) -> Self {
248        Self {
249            id: id.into(),
250            inner: Arc::new(Mutex::new(VecDeque::new())),
251            source: None,
252            channel: None,
253        }
254    }
255    #[allow(dead_code)]
256    pub(crate) fn with_id_and_source<S: Into<String>>(
257        id: S,
258        source: Option<EndpointSource>,
259        channel: Option<ChannelRef>,
260    ) -> Self {
261        Self {
262            id: id.into(),
263            inner: Arc::new(Mutex::new(VecDeque::new())),
264            source,
265            channel,
266        }
267    }
268    /// Get the ID of the endpoint.
269    pub fn id(&self) -> &str {
270        &self.id
271    }
272    pub fn source(&self) -> Option<&EndpointSource> {
273        self.source.as_ref()
274    }
275    pub fn channel(&self) -> Option<&ChannelRef> {
276        self.channel.as_ref()
277    }
278}
279
280impl Endpoint for InMemoryEndpoint {
281    fn id(&self) -> &str {
282        &self.id
283    }
284    fn send(&self, mut exchange: Exchange) -> impl std::future::Future<Output = Result<()>> + Send {
285        async move {
286            if let Some(src) = &self.source {
287                src.apply_headers(&mut exchange);
288            }
289            let mut guard = self.inner.lock().await;
290            guard.push_back(exchange);
291            Ok(())
292        }
293    }
294    fn try_receive(&self) -> impl std::future::Future<Output = Option<Exchange>> + Send {
295        async move {
296            let mut guard = self.inner.lock().await;
297            guard.pop_front()
298        }
299    }
300}
301
302/// In-only endpoint: supports sending but not receiving (try_receive returns None).
303#[derive(Clone, Default)]
304pub struct InMemoryInOnlyEndpoint {
305    id: String,
306    inner: Arc<Mutex<VecDeque<Exchange>>>,
307    source: Option<EndpointSource>,
308}
309impl InMemoryInOnlyEndpoint {
310    pub fn id(&self) -> &str {
311        &self.id
312    }
313}
314impl Endpoint for InMemoryInOnlyEndpoint {
315    fn id(&self) -> &str {
316        &self.id
317    }
318    fn send(&self, mut exchange: Exchange) -> impl std::future::Future<Output = Result<()>> + Send {
319        async move {
320            if let Some(src) = &self.source {
321                src.apply_headers(&mut exchange);
322            }
323            let mut g = self.inner.lock().await;
324            g.push_back(exchange);
325            Ok(())
326        }
327    }
328    fn try_receive(&self) -> impl std::future::Future<Output = Option<Exchange>> + Send {
329        async move { None }
330    }
331}