allora_core/adapter.rs
1//! Adapter abstractions: unify how external systems (inbound) and outbound endpoints
2//! integrate with Allora's `Exchange` / `Message` pipeline.
3//!
4//! # Overview
5//! * [`Adapter`]: Marker trait implemented by all adapter types (inbound or outbound).
6//! * [`InboundAdapter`]: Lifecycle-driven receiver of external stimuli (HTTP server, queue consumer).
7//! * [`OutboundAdapter`]: Dispatcher of an `Exchange` toward an external system (HTTP client, producer).
8//! * [`ensure_correlation`]: Helper guaranteeing a `correlation_id` header on the inbound `Message`.
9//!
10//! # Goals
11//! * Provide a small, stable surface for decorating adapters (tracing, metrics, auth).
12//! * Separate inbound (long-running run loop) from outbound (per-dispatch) semantics.
13//! * Encourage early correlation id creation for downstream EIP patterns.
14//!
15//! # Inbound Example
16//! ```
17//! use allora_core::{adapter::InboundAdapter, Result};
18//! use async_trait::async_trait;
19//! #[derive(Debug)]
20//! struct StubInbound;
21//! impl allora_core::adapter::BaseAdapter for StubInbound { fn id(&self) -> &str { "stub_inbound" } }
22//! #[async_trait]
23//! impl InboundAdapter for StubInbound {
24//! async fn run(&self) -> Result<()> { Ok(()) }
25//! }
26//! let stub = StubInbound;
27//! // Async usage only; no feature gating.
28//! tokio::runtime::Runtime::new().unwrap().block_on(async { stub.run().await.unwrap(); });
29//! ```
30//!
31//! # Outbound Example (Hypothetical)
32//! ```
33//! use allora_core::{adapter::{OutboundAdapter , OutboundDispatchResult}, error::Result, Exchange, Message};
34//! use async_trait::async_trait;
35//! #[derive(Debug)] struct LoggingOutbound;
36//! impl allora_core::adapter::BaseAdapter for LoggingOutbound { fn id(&self) -> &str { "logging_outbound" } }
37//! #[async_trait]
38//! impl OutboundAdapter for LoggingOutbound {
39//! async fn dispatch(&self, exchange: &Exchange) -> Result<OutboundDispatchResult> {
40//! Ok(OutboundDispatchResult {
41//! acknowledged: true,
42//! message: exchange
43//! .out_msg
44//! .as_ref()
45//! .and_then(|m| m.body_text())
46//! .map(|s| format!("echo:{s}")),
47//! status_code: None,
48//! body: None,
49//! })
50//! }
51//! }
52//! let adapter = LoggingOutbound;
53//! let mut exchange = Exchange::new(Message::from_text("hello"));
54//! exchange.out_msg = Some(Message::from_text("world"));
55//! // Async usage only; no feature gating.
56//! let res = tokio::runtime::Runtime::new().unwrap().block_on(async { adapter.dispatch(&exchange).await.unwrap() });
57//! {
58//! assert!(res.acknowledged);
59//! assert_eq!(res.message.unwrap(), "echo:world");
60//! }
61//! ```
62//!
63//! # Correlation Example
64//! ```rust
65//! use allora_core::{adapter::ensure_correlation, Message, Exchange};
66//! let mut exchange = Exchange::new(Message::from_text("payload"));
67//! ensure_correlation(&mut exchange);
68//! assert!(exchange.in_msg.header("correlation_id").is_some());
69//! ```
70//!
71//! # Correlation Strategy
72//! Correlation IDs are lazily generated by [`Exchange::correlation_id`]. Inbound adapters should
73//! call [`ensure_correlation`] immediately after constructing the `Exchange` to guarantee downstream
74//! processors (aggregators, splitters, request/reply) can rely on the header.
75//!
76//! # Future Extensions
77//! * Middleware chain for adapters (auth, rate limiting).
78//! * Backpressure signaling (rejecting inbound requests under load).
79//! * Retry / circuit breaker policies for outbound dispatch.
80//! * Unified metrics façade (requests_total, dispatch_latency_seconds).
81
82use crate::{error::Result, Exchange};
83use async_trait::async_trait;
84use std::fmt::Debug;
85
86/// Marker trait for all adapters (inbound or outbound). Intentionally empty so it can be
87/// used for blanket implementations of decoration layers. Do NOT add methods here unless they
88/// apply uniformly to both inbound and outbound adapters.
89pub trait BaseAdapter: Send + Sync + Debug {
90 /// Stable identifier for this adapter instance (user-assigned or generated).
91 fn id(&self) -> &str;
92}
93
94/// Inbound adapter: receives external data/events and produces `Exchange`s routed inside Allora.
95///
96/// Implementations should:
97/// * Parse / normalize an external protocol entity into a `Message` (payload + headers).
98/// * Invoke [`crate::adapter::ensure_correlation`] before dispatch.
99/// * Dispatch the `Exchange` via a channel / route.
100/// * Manage lifecycle (bind ports, subscribe to topics, handle shutdown).
101#[async_trait]
102pub trait InboundAdapter: BaseAdapter {
103 /// Run the adapter until stopped (lifecycle). Implementations define their own shutdown semantics.
104 async fn run(&self) -> Result<()>;
105}
106
107/// Metadata returned from outbound dispatch. Leave fields optional / expandable for future.
108#[allow(dead_code)]
109#[derive(Debug, Default)]
110pub struct OutboundDispatchResult {
111 /// Whether the external system acknowledged / accepted the dispatch.
112 pub acknowledged: bool,
113 /// Optional diagnostic or remote system message (e.g. HTTP status text, broker ack info).
114 pub message: Option<String>,
115 /// Optional HTTP status code for the dispatch.
116 pub status_code: Option<u16>,
117 /// Optional body for the dispatch, if applicable.
118 pub body: Option<String>,
119}
120
121/// Outbound adapter: sends data derived from an `Exchange` to an external system.
122///
123/// Implementations should:
124/// * Select `in_msg` or `out_msg` as the source for serialization (document behavior).
125/// * Propagate correlation / message ids outward if relevant (tracing continuity).
126/// * Map remote responses (status, id) back into the returned `OutboundDispatchResult`.
127#[allow(dead_code)]
128#[async_trait]
129pub trait OutboundAdapter: BaseAdapter {
130 /// Perform a single outbound dispatch. Return an `OutboundDispatchResult` describing outcome.
131 async fn dispatch(&self, exchange: &Exchange) -> Result<OutboundDispatchResult>;
132}
133
134/// Ensure a `correlation_id` header exists on the inbound message of the provided `Exchange`.
135/// Safe to call multiple times (id will be stable after first generation).
136pub fn ensure_correlation(exchange: &mut Exchange) {
137 if let Some(existing) = exchange.in_msg.header("corr_id").map(|s| s.to_string()) {
138 if exchange.in_msg.header("correlation_id").is_none() {
139 exchange.in_msg.set_header("correlation_id", &existing);
140 }
141 return;
142 }
143 if let Some(cid) = exchange
144 .in_msg
145 .header("correlation_id")
146 .map(|s| s.to_string())
147 {
148 exchange.in_msg.set_header("corr_id", &cid);
149 return;
150 }
151 let new_id = uuid::Uuid::new_v4().to_string();
152 exchange.in_msg.set_header("corr_id", &new_id);
153 exchange.in_msg.set_header("correlation_id", &new_id);
154}
155
156/// Staged builder root: pattern-first entry (`Adapter::inbound().http()...`).
157pub struct Adapter;
158impl Adapter {
159 pub fn inbound() -> InboundStage {
160 InboundStage
161 }
162 pub fn outbound() -> OutboundStage {
163 OutboundStage
164 }
165}
166pub struct InboundStage;
167pub struct OutboundStage;