allora-core 0.0.6

Core primitives for Allora: messages, exchanges, channels, processors, patterns (EIP) for Rust integration flows.
Documentation
//! Adapter abstractions: unify how external systems (inbound) and outbound endpoints
//! integrate with Allora's `Exchange` / `Message` pipeline.
//!
//! # Overview
//! * [`Adapter`]: Marker trait implemented by all adapter types (inbound or outbound).
//! * [`InboundAdapter`]: Lifecycle-driven receiver of external stimuli (HTTP server, queue consumer).
//! * [`OutboundAdapter`]: Dispatcher of an `Exchange` toward an external system (HTTP client, producer).
//! * [`ensure_correlation`]: Helper guaranteeing a `correlation_id` header on the inbound `Message`.
//!
//! # Goals
//! * Provide a small, stable surface for decorating adapters (tracing, metrics, auth).
//! * Separate inbound (long-running run loop) from outbound (per-dispatch) semantics.
//! * Encourage early correlation id creation for downstream EIP patterns.
//!
//! # Inbound Example
//! ```
//! use allora_core::{adapter::InboundAdapter, Result};
//! use async_trait::async_trait;
//! #[derive(Debug)]
//! struct StubInbound;
//! impl allora_core::adapter::BaseAdapter for StubInbound { fn id(&self) -> &str { "stub_inbound" } }
//! #[async_trait]
//! impl InboundAdapter for StubInbound {
//!     async fn run(&self) -> Result<()> { Ok(()) }
//! }
//! let stub = StubInbound;
//! // Async usage only; no feature gating.
//! tokio::runtime::Runtime::new().unwrap().block_on(async { stub.run().await.unwrap(); });
//! ```
//!
//! # Outbound Example (Hypothetical)
//! ```
//! use allora_core::{adapter::{OutboundAdapter , OutboundDispatchResult}, error::Result, Exchange, Message};
//! use async_trait::async_trait;
//! #[derive(Debug)] struct LoggingOutbound;
//! impl allora_core::adapter::BaseAdapter for LoggingOutbound { fn id(&self) -> &str { "logging_outbound" } }
//! #[async_trait]
//! impl OutboundAdapter for LoggingOutbound {
//!     async fn dispatch(&self, exchange: &Exchange) -> Result<OutboundDispatchResult> {
//!         Ok(OutboundDispatchResult {
//!             acknowledged: true,
//!             message: exchange
//!                 .out_msg
//!                 .as_ref()
//!                 .and_then(|m| m.body_text())
//!                 .map(|s| format!("echo:{s}")),
//!             status_code: None,
//!             body: None,
//!         })
//!     }
//! }
//! let adapter = LoggingOutbound;
//! let mut exchange = Exchange::new(Message::from_text("hello"));
//! exchange.out_msg = Some(Message::from_text("world"));
//! // Async usage only; no feature gating.
//! let res = tokio::runtime::Runtime::new().unwrap().block_on(async { adapter.dispatch(&exchange).await.unwrap() });
//! {
//!     assert!(res.acknowledged);
//!     assert_eq!(res.message.unwrap(), "echo:world");
//! }
//! ```
//!
//! # Correlation Example
//! ```rust
//! use allora_core::{adapter::ensure_correlation, Message, Exchange};
//! let mut exchange = Exchange::new(Message::from_text("payload"));
//! ensure_correlation(&mut exchange);
//! assert!(exchange.in_msg.header("correlation_id").is_some());
//! ```
//!
//! # Correlation Strategy
//! Correlation IDs are lazily generated by [`Exchange::correlation_id`]. Inbound adapters should
//! call [`ensure_correlation`] immediately after constructing the `Exchange` to guarantee downstream
//! processors (aggregators, splitters, request/reply) can rely on the header.
//!
//! # Future Extensions
//! * Middleware chain for adapters (auth, rate limiting).
//! * Backpressure signaling (rejecting inbound requests under load).
//! * Retry / circuit breaker policies for outbound dispatch.
//! * Unified metrics façade (requests_total, dispatch_latency_seconds).

use crate::{error::Result, Exchange};
use async_trait::async_trait;
use std::fmt::Debug;

/// Marker trait for all adapters (inbound or outbound). Intentionally empty so it can be
/// used for blanket implementations of decoration layers. Do NOT add methods here unless they
/// apply uniformly to both inbound and outbound adapters.
pub trait BaseAdapter: Send + Sync + Debug {
    /// Stable identifier for this adapter instance (user-assigned or generated).
    fn id(&self) -> &str;
}

/// Inbound adapter: receives external data/events and produces `Exchange`s routed inside Allora.
///
/// Implementations should:
/// * Parse / normalize an external protocol entity into a `Message` (payload + headers).
/// * Invoke [`crate::adapter::ensure_correlation`] before dispatch.
/// * Dispatch the `Exchange` via a channel / route.
/// * Manage lifecycle (bind ports, subscribe to topics, handle shutdown).
#[async_trait]
pub trait InboundAdapter: BaseAdapter {
    /// Run the adapter until stopped (lifecycle). Implementations define their own shutdown semantics.
    async fn run(&self) -> Result<()>;
}

/// Metadata returned from outbound dispatch. Leave fields optional / expandable for future.
#[allow(dead_code)]
#[derive(Debug, Default)]
pub struct OutboundDispatchResult {
    /// Whether the external system acknowledged / accepted the dispatch.
    pub acknowledged: bool,
    /// Optional diagnostic or remote system message (e.g. HTTP status text, broker ack info).
    pub message: Option<String>,
    /// Optional HTTP status code for the dispatch.
    pub status_code: Option<u16>,
    /// Optional body for the dispatch, if applicable.
    pub body: Option<String>,
}

/// Outbound adapter: sends data derived from an `Exchange` to an external system.
///
/// Implementations should:
/// * Select `in_msg` or `out_msg` as the source for serialization (document behavior).
/// * Propagate correlation / message ids outward if relevant (tracing continuity).
/// * Map remote responses (status, id) back into the returned `OutboundDispatchResult`.
#[allow(dead_code)]
#[async_trait]
pub trait OutboundAdapter: BaseAdapter {
    /// Perform a single outbound dispatch. Return an `OutboundDispatchResult` describing outcome.
    async fn dispatch(&self, exchange: &Exchange) -> Result<OutboundDispatchResult>;
}

/// Ensure a `correlation_id` header exists on the inbound message of the provided `Exchange`.
/// Safe to call multiple times (id will be stable after first generation).
pub fn ensure_correlation(exchange: &mut Exchange) {
    if let Some(existing) = exchange.in_msg.header("corr_id").map(|s| s.to_string()) {
        if exchange.in_msg.header("correlation_id").is_none() {
            exchange.in_msg.set_header("correlation_id", &existing);
        }
        return;
    }
    if let Some(cid) = exchange
        .in_msg
        .header("correlation_id")
        .map(|s| s.to_string())
    {
        exchange.in_msg.set_header("corr_id", &cid);
        return;
    }
    let new_id = uuid::Uuid::new_v4().to_string();
    exchange.in_msg.set_header("corr_id", &new_id);
    exchange.in_msg.set_header("correlation_id", &new_id);
}

/// Staged builder root: pattern-first entry (`Adapter::inbound().http()...`).
pub struct Adapter;
impl Adapter {
    pub fn inbound() -> InboundStage {
        InboundStage
    }
    pub fn outbound() -> OutboundStage {
        OutboundStage
    }
}
pub struct InboundStage;
pub struct OutboundStage;