fraiseql_core/runtime/subscription/transport.rs
1use async_trait::async_trait;
2
3use super::{SubscriptionError, types::SubscriptionEvent};
4
5// =============================================================================
6// Transport Adapters
7// =============================================================================
8
9/// Transport adapter trait for delivering subscription events.
10///
11/// Transport adapters are responsible for delivering events to external systems.
12/// Each adapter implements a specific delivery mechanism (HTTP, Kafka, etc.).
13///
14/// # Implementors
15///
16/// - [`super::WebhookAdapter`] - HTTP POST delivery with retry logic
17/// - [`super::KafkaAdapter`] - Apache Kafka event streaming
18// Reason: used as dyn Trait (Box<dyn TransportAdapter>); async_trait ensures Send bounds and
19// dyn-compatibility async_trait: dyn-dispatch required; remove when RTN + Send is stable (RFC 3425)
20#[async_trait]
21pub trait TransportAdapter: Send + Sync {
22 /// Deliver an event to the transport.
23 ///
24 /// # Arguments
25 ///
26 /// * `event` - The subscription event to deliver
27 /// * `subscription_name` - Name of the subscription
28 ///
29 /// # Returns
30 ///
31 /// `Ok(())` on successful delivery, `Err` on failure.
32 async fn deliver(
33 &self,
34 event: &SubscriptionEvent,
35 subscription_name: &str,
36 ) -> Result<(), SubscriptionError>;
37
38 /// Get the adapter name for logging/metrics.
39 fn name(&self) -> &'static str;
40
41 /// Check if the adapter is healthy/connected.
42 async fn health_check(&self) -> bool;
43}
44
45/// Type alias for boxed dynamic transport adapter.
46///
47/// Used to store transport adapters without generic type parameters.
48pub type BoxDynTransportAdapter = Box<dyn TransportAdapter>;
49
50/// Multi-transport delivery manager.
51///
52/// Manages multiple transport adapters and delivers events to all configured
53/// destinations in parallel.
54///
55/// # Example
56///
57/// ```no_run
58/// // Requires: live transport destination (webhook/NATS/etc).
59/// use fraiseql_core::runtime::subscription::{
60/// TransportManager, WebhookAdapter, WebhookTransportConfig, SubscriptionEvent,
61/// };
62///
63/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
64/// # let event: SubscriptionEvent = panic!("example");
65/// let mut manager = TransportManager::new();
66///
67/// // Add webhook adapter
68/// let webhook = WebhookAdapter::new(WebhookTransportConfig::new("https://api.example.com/events"))?;
69/// manager.add_adapter(Box::new(webhook));
70///
71/// // Deliver to all transports
72/// manager.deliver_all(&event, "orderCreated").await?;
73/// # Ok(())
74/// # }
75/// ```
76pub struct TransportManager {
77 adapters: Vec<BoxDynTransportAdapter>,
78}
79
80impl TransportManager {
81 /// Create a new transport manager.
82 #[must_use]
83 pub fn new() -> Self {
84 Self {
85 adapters: Vec::new(),
86 }
87 }
88
89 /// Add a transport adapter.
90 pub fn add_adapter(&mut self, adapter: BoxDynTransportAdapter) {
91 tracing::info!(adapter = adapter.name(), "Added transport adapter");
92 self.adapters.push(adapter);
93 }
94
95 /// Get the number of configured adapters.
96 #[must_use]
97 pub fn adapter_count(&self) -> usize {
98 self.adapters.len()
99 }
100
101 /// Check if there are no adapters configured.
102 #[must_use]
103 pub fn is_empty(&self) -> bool {
104 self.adapters.is_empty()
105 }
106
107 /// Deliver an event to all configured transports.
108 ///
109 /// Delivers in parallel and collects results. Delivery failures are accumulated
110 /// in [`DeliveryResult::errors`] rather than propagated as `Err`.
111 ///
112 /// # Errors
113 ///
114 /// Currently infallible — always returns `Ok`. Individual adapter failures are
115 /// captured in [`DeliveryResult::errors`] and do not short-circuit the method.
116 pub async fn deliver_all(
117 &self,
118 event: &SubscriptionEvent,
119 subscription_name: &str,
120 ) -> Result<DeliveryResult, SubscriptionError> {
121 if self.adapters.is_empty() {
122 return Ok(DeliveryResult {
123 successful: 0,
124 failed: 0,
125 errors: Vec::new(),
126 });
127 }
128
129 let futures: Vec<_> = self
130 .adapters
131 .iter()
132 .map(|adapter| {
133 let name = adapter.name().to_string();
134 async move {
135 let result = adapter.deliver(event, subscription_name).await;
136 (name, result)
137 }
138 })
139 .collect();
140
141 let results = futures::future::join_all(futures).await;
142
143 let mut successful = 0;
144 let mut failed = 0;
145 let mut errors = Vec::new();
146
147 for (name, result) in results {
148 match result {
149 Ok(()) => successful += 1,
150 Err(e) => {
151 failed += 1;
152 errors.push((name, e.to_string()));
153 },
154 }
155 }
156
157 Ok(DeliveryResult {
158 successful,
159 failed,
160 errors,
161 })
162 }
163
164 /// Check health of all adapters.
165 pub async fn health_check_all(&self) -> Vec<(String, bool)> {
166 let futures: Vec<_> = self
167 .adapters
168 .iter()
169 .map(|adapter| {
170 let name = adapter.name().to_string();
171 async move {
172 let healthy = adapter.health_check().await;
173 (name, healthy)
174 }
175 })
176 .collect();
177
178 futures::future::join_all(futures).await
179 }
180}
181
182impl Default for TransportManager {
183 fn default() -> Self {
184 Self::new()
185 }
186}
187
188impl std::fmt::Debug for TransportManager {
189 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190 f.debug_struct("TransportManager")
191 .field("adapter_count", &self.adapters.len())
192 .finish()
193 }
194}
195
196/// Result of delivering an event to multiple transports.
197#[derive(Debug, Clone)]
198pub struct DeliveryResult {
199 /// Number of successful deliveries.
200 pub successful: usize,
201 /// Number of failed deliveries.
202 pub failed: usize,
203 /// Errors from failed deliveries (adapter name, error message).
204 pub errors: Vec<(String, String)>,
205}
206
207impl DeliveryResult {
208 /// Check if all deliveries succeeded.
209 #[must_use]
210 pub const fn all_succeeded(&self) -> bool {
211 self.failed == 0
212 }
213
214 /// Check if at least one delivery succeeded.
215 #[must_use]
216 pub const fn any_succeeded(&self) -> bool {
217 self.successful > 0
218 }
219}