Skip to main content

fraiseql_core/runtime/subscription/
transport.rs

1use super::{SubscriptionError, types::SubscriptionEvent};
2
3// =============================================================================
4// Transport Adapters
5// =============================================================================
6
7/// Transport adapter trait for delivering subscription events.
8///
9/// Transport adapters are responsible for delivering events to external systems.
10/// Each adapter implements a specific delivery mechanism (HTTP, Kafka, etc.).
11///
12/// # Implementors
13///
14/// - [`super::WebhookAdapter`] - HTTP POST delivery with retry logic
15/// - [`super::KafkaAdapter`] - Apache Kafka event streaming
16#[async_trait::async_trait]
17pub trait TransportAdapter: Send + Sync {
18    /// Deliver an event to the transport.
19    ///
20    /// # Arguments
21    ///
22    /// * `event` - The subscription event to deliver
23    /// * `subscription_name` - Name of the subscription
24    ///
25    /// # Returns
26    ///
27    /// `Ok(())` on successful delivery, `Err` on failure.
28    async fn deliver(
29        &self,
30        event: &SubscriptionEvent,
31        subscription_name: &str,
32    ) -> Result<(), SubscriptionError>;
33
34    /// Get the adapter name for logging/metrics.
35    fn name(&self) -> &'static str;
36
37    /// Check if the adapter is healthy/connected.
38    async fn health_check(&self) -> bool;
39}
40
41/// Multi-transport delivery manager.
42///
43/// Manages multiple transport adapters and delivers events to all configured
44/// destinations in parallel.
45///
46/// # Example
47///
48/// ```ignore
49/// use fraiseql_core::runtime::subscription::{
50///     TransportManager, WebhookAdapter, WebhookConfig,
51/// };
52///
53/// let mut manager = TransportManager::new();
54///
55/// // Add webhook adapter
56/// let webhook = WebhookAdapter::new(WebhookConfig::new("https://api.example.com/events"));
57/// manager.add_adapter(Box::new(webhook));
58///
59/// // Deliver to all transports
60/// manager.deliver_all(&event, "orderCreated").await?;
61/// ```
62pub struct TransportManager {
63    adapters: Vec<Box<dyn TransportAdapter>>,
64}
65
66impl TransportManager {
67    /// Create a new transport manager.
68    #[must_use]
69    pub fn new() -> Self {
70        Self {
71            adapters: Vec::new(),
72        }
73    }
74
75    /// Add a transport adapter.
76    pub fn add_adapter(&mut self, adapter: Box<dyn TransportAdapter>) {
77        tracing::info!(adapter = adapter.name(), "Added transport adapter");
78        self.adapters.push(adapter);
79    }
80
81    /// Get the number of configured adapters.
82    #[must_use]
83    pub fn adapter_count(&self) -> usize {
84        self.adapters.len()
85    }
86
87    /// Check if there are no adapters configured.
88    #[must_use]
89    pub fn is_empty(&self) -> bool {
90        self.adapters.is_empty()
91    }
92
93    /// Deliver an event to all configured transports.
94    ///
95    /// Delivers in parallel and collects results. Returns Ok if at least one
96    /// delivery succeeded, or the last error if all failed.
97    pub async fn deliver_all(
98        &self,
99        event: &SubscriptionEvent,
100        subscription_name: &str,
101    ) -> Result<DeliveryResult, SubscriptionError> {
102        if self.adapters.is_empty() {
103            return Ok(DeliveryResult {
104                successful: 0,
105                failed:     0,
106                errors:     Vec::new(),
107            });
108        }
109
110        let futures: Vec<_> = self
111            .adapters
112            .iter()
113            .map(|adapter| {
114                let name = adapter.name().to_string();
115                async move {
116                    let result = adapter.deliver(event, subscription_name).await;
117                    (name, result)
118                }
119            })
120            .collect();
121
122        let results = futures::future::join_all(futures).await;
123
124        let mut successful = 0;
125        let mut failed = 0;
126        let mut errors = Vec::new();
127
128        for (name, result) in results {
129            match result {
130                Ok(()) => successful += 1,
131                Err(e) => {
132                    failed += 1;
133                    errors.push((name, e.to_string()));
134                },
135            }
136        }
137
138        Ok(DeliveryResult {
139            successful,
140            failed,
141            errors,
142        })
143    }
144
145    /// Check health of all adapters.
146    pub async fn health_check_all(&self) -> Vec<(String, bool)> {
147        let futures: Vec<_> = self
148            .adapters
149            .iter()
150            .map(|adapter| {
151                let name = adapter.name().to_string();
152                async move {
153                    let healthy = adapter.health_check().await;
154                    (name, healthy)
155                }
156            })
157            .collect();
158
159        futures::future::join_all(futures).await
160    }
161}
162
163impl Default for TransportManager {
164    fn default() -> Self {
165        Self::new()
166    }
167}
168
169impl std::fmt::Debug for TransportManager {
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        f.debug_struct("TransportManager")
172            .field("adapter_count", &self.adapters.len())
173            .finish()
174    }
175}
176
177/// Result of delivering an event to multiple transports.
178#[derive(Debug, Clone)]
179pub struct DeliveryResult {
180    /// Number of successful deliveries.
181    pub successful: usize,
182    /// Number of failed deliveries.
183    pub failed:     usize,
184    /// Errors from failed deliveries (adapter name, error message).
185    pub errors:     Vec<(String, String)>,
186}
187
188impl DeliveryResult {
189    /// Check if all deliveries succeeded.
190    #[must_use]
191    pub fn all_succeeded(&self) -> bool {
192        self.failed == 0
193    }
194
195    /// Check if at least one delivery succeeded.
196    #[must_use]
197    pub fn any_succeeded(&self) -> bool {
198        self.successful > 0
199    }
200}