Skip to main content

fraiseql_core/runtime/subscription/
transport.rs

1use async_trait::async_trait;
2
3use super::{SubscriptionError, types::SubscriptionEvent};
4
5// =============================================================================
6// Transport Adapters
7// =============================================================================
8
9/// Transport adapter trait for delivering subscription events.
10///
11/// Transport adapters are responsible for delivering events to external systems.
12/// Each adapter implements a specific delivery mechanism (HTTP, Kafka, etc.).
13///
14/// # Implementors
15///
16/// - [`super::WebhookAdapter`] - HTTP POST delivery with retry logic
17/// - [`super::KafkaAdapter`] - Apache Kafka event streaming
18// Reason: used as dyn Trait (Box<dyn TransportAdapter>); async_trait ensures Send bounds and
19// dyn-compatibility async_trait: dyn-dispatch required; remove when RTN + Send is stable (RFC 3425)
20#[async_trait]
21pub trait TransportAdapter: Send + Sync {
22    /// Deliver an event to the transport.
23    ///
24    /// # Arguments
25    ///
26    /// * `event` - The subscription event to deliver
27    /// * `subscription_name` - Name of the subscription
28    ///
29    /// # Returns
30    ///
31    /// `Ok(())` on successful delivery, `Err` on failure.
32    async fn deliver(
33        &self,
34        event: &SubscriptionEvent,
35        subscription_name: &str,
36    ) -> Result<(), SubscriptionError>;
37
38    /// Get the adapter name for logging/metrics.
39    fn name(&self) -> &'static str;
40
41    /// Check if the adapter is healthy/connected.
42    async fn health_check(&self) -> bool;
43}
44
45/// Type alias for boxed dynamic transport adapter.
46///
47/// Used to store transport adapters without generic type parameters.
48pub type BoxDynTransportAdapter = Box<dyn TransportAdapter>;
49
50/// Multi-transport delivery manager.
51///
52/// Manages multiple transport adapters and delivers events to all configured
53/// destinations in parallel.
54///
55/// # Example
56///
57/// ```no_run
58/// // Requires: live transport destination (webhook/NATS/etc).
59/// use fraiseql_core::runtime::subscription::{
60///     TransportManager, WebhookAdapter, WebhookTransportConfig, SubscriptionEvent,
61/// };
62///
63/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
64/// # let event: SubscriptionEvent = panic!("example");
65/// let mut manager = TransportManager::new();
66///
67/// // Add webhook adapter
68/// let webhook = WebhookAdapter::new(WebhookTransportConfig::new("https://api.example.com/events"))?;
69/// manager.add_adapter(Box::new(webhook));
70///
71/// // Deliver to all transports
72/// manager.deliver_all(&event, "orderCreated").await?;
73/// # Ok(())
74/// # }
75/// ```
76pub struct TransportManager {
77    adapters: Vec<BoxDynTransportAdapter>,
78}
79
80impl TransportManager {
81    /// Create a new transport manager.
82    #[must_use]
83    pub fn new() -> Self {
84        Self {
85            adapters: Vec::new(),
86        }
87    }
88
89    /// Add a transport adapter.
90    pub fn add_adapter(&mut self, adapter: BoxDynTransportAdapter) {
91        tracing::info!(adapter = adapter.name(), "Added transport adapter");
92        self.adapters.push(adapter);
93    }
94
95    /// Get the number of configured adapters.
96    #[must_use]
97    pub fn adapter_count(&self) -> usize {
98        self.adapters.len()
99    }
100
101    /// Check if there are no adapters configured.
102    #[must_use]
103    pub fn is_empty(&self) -> bool {
104        self.adapters.is_empty()
105    }
106
107    /// Deliver an event to all configured transports.
108    ///
109    /// Delivers in parallel and collects results. Delivery failures are accumulated
110    /// in [`DeliveryResult::errors`] rather than propagated as `Err`.
111    ///
112    /// # Errors
113    ///
114    /// Currently infallible — always returns `Ok`. Individual adapter failures are
115    /// captured in [`DeliveryResult::errors`] and do not short-circuit the method.
116    pub async fn deliver_all(
117        &self,
118        event: &SubscriptionEvent,
119        subscription_name: &str,
120    ) -> Result<DeliveryResult, SubscriptionError> {
121        if self.adapters.is_empty() {
122            return Ok(DeliveryResult {
123                successful: 0,
124                failed:     0,
125                errors:     Vec::new(),
126            });
127        }
128
129        let futures: Vec<_> = self
130            .adapters
131            .iter()
132            .map(|adapter| {
133                let name = adapter.name().to_string();
134                async move {
135                    let result = adapter.deliver(event, subscription_name).await;
136                    (name, result)
137                }
138            })
139            .collect();
140
141        let results = futures::future::join_all(futures).await;
142
143        let mut successful = 0;
144        let mut failed = 0;
145        let mut errors = Vec::new();
146
147        for (name, result) in results {
148            match result {
149                Ok(()) => successful += 1,
150                Err(e) => {
151                    failed += 1;
152                    errors.push((name, e.to_string()));
153                },
154            }
155        }
156
157        Ok(DeliveryResult {
158            successful,
159            failed,
160            errors,
161        })
162    }
163
164    /// Check health of all adapters.
165    pub async fn health_check_all(&self) -> Vec<(String, bool)> {
166        let futures: Vec<_> = self
167            .adapters
168            .iter()
169            .map(|adapter| {
170                let name = adapter.name().to_string();
171                async move {
172                    let healthy = adapter.health_check().await;
173                    (name, healthy)
174                }
175            })
176            .collect();
177
178        futures::future::join_all(futures).await
179    }
180}
181
182impl Default for TransportManager {
183    fn default() -> Self {
184        Self::new()
185    }
186}
187
188impl std::fmt::Debug for TransportManager {
189    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190        f.debug_struct("TransportManager")
191            .field("adapter_count", &self.adapters.len())
192            .finish()
193    }
194}
195
196/// Result of delivering an event to multiple transports.
197#[derive(Debug, Clone)]
198pub struct DeliveryResult {
199    /// Number of successful deliveries.
200    pub successful: usize,
201    /// Number of failed deliveries.
202    pub failed:     usize,
203    /// Errors from failed deliveries (adapter name, error message).
204    pub errors:     Vec<(String, String)>,
205}
206
207impl DeliveryResult {
208    /// Check if all deliveries succeeded.
209    #[must_use]
210    pub const fn all_succeeded(&self) -> bool {
211        self.failed == 0
212    }
213
214    /// Check if at least one delivery succeeded.
215    #[must_use]
216    pub const fn any_succeeded(&self) -> bool {
217        self.successful > 0
218    }
219}