1use crate::channel::{Channel, ChannelRef};
32use crate::{error::Result, Exchange};
33use std::collections::VecDeque;
34use std::sync::Arc;
35use tokio::sync::Mutex;
36
37#[derive(Clone, Debug)]
39pub enum EndpointSource {
40 Http {
41 adapter_id: String,
42 method: String,
43 path: String,
44 },
45 Channel {
46 channel_id: String,
47 },
48}
49impl EndpointSource {
50 pub fn apply_headers(&self, exchange: &mut Exchange) {
51 match self {
52 EndpointSource::Http {
53 adapter_id,
54 method,
55 path,
56 } => {
57 if exchange.in_msg.header("source.kind").is_none() {
58 exchange.in_msg.set_header("source.kind", "http");
59 }
60 if exchange.in_msg.header("source.adapter_id").is_none() {
61 exchange.in_msg.set_header("source.adapter_id", adapter_id);
62 }
63 if exchange.in_msg.header("source.http.method").is_none() {
64 exchange.in_msg.set_header("source.http.method", method);
65 }
66 if exchange.in_msg.header("source.http.path").is_none() {
67 exchange.in_msg.set_header("source.http.path", path);
68 }
69 }
70 EndpointSource::Channel { channel_id } => {
71 if exchange.in_msg.header("source.kind").is_none() {
72 exchange.in_msg.set_header("source.kind", "channel");
73 }
74 if exchange.in_msg.header("source.channel_id").is_none() {
75 exchange.in_msg.set_header("source.channel_id", channel_id);
76 }
77 }
78 }
79 }
80}
81
82pub trait Endpoint: Send + Sync {
84 fn id(&self) -> &str;
86 fn send(&self, exchange: Exchange) -> impl std::future::Future<Output = Result<()>> + Send;
88 fn try_receive(&self) -> impl std::future::Future<Output = Option<Exchange>> + Send;
90}
91
92pub struct EndpointBuilder;
94impl EndpointBuilder {
95 pub fn in_out() -> InOutStage {
96 InOutStage
97 }
98 pub fn in_only() -> InOnlyStage {
99 InOnlyStage
100 }
101}
102pub struct InOutStage;
103pub struct InOnlyStage;
104impl InOutStage {
105 pub fn queue(self) -> InOutQueueEndpointBuilder {
106 InOutQueueEndpointBuilder {
107 id: None,
108 source: None,
109 channel: None,
110 }
111 }
112}
113impl InOnlyStage {
114 pub fn queue(self) -> InOnlyInMemoryEndpointBuilder {
115 InOnlyInMemoryEndpointBuilder {
116 id: None,
117 source: None,
118 }
119 }
120}
121pub struct InOutQueueEndpointBuilder {
123 id: Option<String>,
124 source: Option<EndpointSource>,
125 channel: Option<ChannelRef>,
126}
127impl InOutQueueEndpointBuilder {
128 pub fn id(mut self, id: impl Into<String>) -> Self {
129 self.id = Some(id.into());
130 self
131 }
132 pub fn channel(mut self, ch: ChannelRef) -> Self {
133 self.channel = Some(ch);
134 self
135 }
136
137 pub fn source(mut self, src: EndpointSource) -> Self {
139 self.source = Some(src);
140 self
141 }
142 pub fn source_http(
143 self,
144 _adapter: &Arc<()>,
145 _method: impl Into<String>,
146 _path: impl Into<String>,
147 ) -> Self {
148 self
149 }
150 pub fn source_channel<T: Channel + 'static>(mut self, channel: &Arc<T>) -> Self {
151 self.source = Some(EndpointSource::Channel {
152 channel_id: channel.id().to_string(),
153 });
154 self.channel = Some(channel.clone());
156 self
157 }
158 pub fn build(self) -> Arc<InMemoryEndpoint> {
159 let ep = match self.id {
160 Some(id) => Arc::new(InMemoryEndpoint::with_id_and_source(
161 id,
162 self.source.clone(),
163 self.channel.clone(),
164 )),
165 None => Arc::new(InMemoryEndpoint::new_with_source(
166 self.source.clone(),
167 self.channel.clone(),
168 )),
169 };
170 ep
171 }
172}
173pub struct InOnlyInMemoryEndpointBuilder {
175 id: Option<String>,
176 source: Option<EndpointSource>,
177}
178impl InOnlyInMemoryEndpointBuilder {
179 pub fn id(mut self, id: impl Into<String>) -> Self {
180 self.id = Some(id.into());
181 self
182 }
183 pub fn source_http(
184 self,
185 _adapter: &Arc<()>,
186 _method: impl Into<String>,
187 _path: impl Into<String>,
188 ) -> Self {
189 self
190 }
191 pub fn source_channel<T: Channel + 'static>(mut self, channel: &Arc<T>) -> Self {
192 self.source = Some(EndpointSource::Channel {
193 channel_id: channel.id().to_string(),
194 });
195 self
196 }
197 pub fn build(self) -> Arc<InMemoryInOnlyEndpoint> {
198 let id = self
199 .id
200 .unwrap_or_else(|| format!("endpoint:{}", uuid::Uuid::new_v4()));
201 let ep = Arc::new(InMemoryInOnlyEndpoint {
202 id,
203 inner: std::sync::Arc::new(Mutex::new(VecDeque::new())),
204 source: self.source,
205 });
206 ep
207 }
208}
209
210#[derive(Clone, Default)]
216pub struct InMemoryEndpoint {
217 id: String,
218 inner: Arc<Mutex<VecDeque<Exchange>>>,
219 source: Option<EndpointSource>,
220 channel: Option<ChannelRef>,
221}
222
223impl InMemoryEndpoint {
224 #[allow(dead_code)]
226 pub(crate) fn new() -> Self {
227 Self {
228 id: format!("endpoint:{}", uuid::Uuid::new_v4()),
229 inner: Arc::new(Mutex::new(VecDeque::new())),
230 source: None,
231 channel: None,
232 }
233 }
234 pub(crate) fn new_with_source(
235 source: Option<EndpointSource>,
236 channel: Option<ChannelRef>,
237 ) -> Self {
238 Self {
239 id: format!("endpoint:{}", uuid::Uuid::new_v4()),
240 inner: Arc::new(Mutex::new(VecDeque::new())),
241 source,
242 channel,
243 }
244 }
245 #[allow(dead_code)]
247 pub(crate) fn with_id<S: Into<String>>(id: S) -> Self {
248 Self {
249 id: id.into(),
250 inner: Arc::new(Mutex::new(VecDeque::new())),
251 source: None,
252 channel: None,
253 }
254 }
255 #[allow(dead_code)]
256 pub(crate) fn with_id_and_source<S: Into<String>>(
257 id: S,
258 source: Option<EndpointSource>,
259 channel: Option<ChannelRef>,
260 ) -> Self {
261 Self {
262 id: id.into(),
263 inner: Arc::new(Mutex::new(VecDeque::new())),
264 source,
265 channel,
266 }
267 }
268 pub fn id(&self) -> &str {
270 &self.id
271 }
272 pub fn source(&self) -> Option<&EndpointSource> {
273 self.source.as_ref()
274 }
275 pub fn channel(&self) -> Option<&ChannelRef> {
276 self.channel.as_ref()
277 }
278}
279
280impl Endpoint for InMemoryEndpoint {
281 fn id(&self) -> &str {
282 &self.id
283 }
284 fn send(&self, mut exchange: Exchange) -> impl std::future::Future<Output = Result<()>> + Send {
285 async move {
286 if let Some(src) = &self.source {
287 src.apply_headers(&mut exchange);
288 }
289 let mut guard = self.inner.lock().await;
290 guard.push_back(exchange);
291 Ok(())
292 }
293 }
294 fn try_receive(&self) -> impl std::future::Future<Output = Option<Exchange>> + Send {
295 async move {
296 let mut guard = self.inner.lock().await;
297 guard.pop_front()
298 }
299 }
300}
301
302#[derive(Clone, Default)]
304pub struct InMemoryInOnlyEndpoint {
305 id: String,
306 inner: Arc<Mutex<VecDeque<Exchange>>>,
307 source: Option<EndpointSource>,
308}
309impl InMemoryInOnlyEndpoint {
310 pub fn id(&self) -> &str {
311 &self.id
312 }
313}
314impl Endpoint for InMemoryInOnlyEndpoint {
315 fn id(&self) -> &str {
316 &self.id
317 }
318 fn send(&self, mut exchange: Exchange) -> impl std::future::Future<Output = Result<()>> + Send {
319 async move {
320 if let Some(src) = &self.source {
321 src.apply_headers(&mut exchange);
322 }
323 let mut g = self.inner.lock().await;
324 g.push_back(exchange);
325 Ok(())
326 }
327 }
328 fn try_receive(&self) -> impl std::future::Future<Output = Option<Exchange>> + Send {
329 async move { None }
330 }
331}