fraiseql_core/runtime/subscription/transport.rs
1use super::{SubscriptionError, types::SubscriptionEvent};
2
3// =============================================================================
4// Transport Adapters
5// =============================================================================
6
7/// Transport adapter trait for delivering subscription events.
8///
9/// Transport adapters are responsible for delivering events to external systems.
10/// Each adapter implements a specific delivery mechanism (HTTP, Kafka, etc.).
11///
12/// # Implementors
13///
14/// - [`super::WebhookAdapter`] - HTTP POST delivery with retry logic
15/// - [`super::KafkaAdapter`] - Apache Kafka event streaming
16#[async_trait::async_trait]
17pub trait TransportAdapter: Send + Sync {
18 /// Deliver an event to the transport.
19 ///
20 /// # Arguments
21 ///
22 /// * `event` - The subscription event to deliver
23 /// * `subscription_name` - Name of the subscription
24 ///
25 /// # Returns
26 ///
27 /// `Ok(())` on successful delivery, `Err` on failure.
28 async fn deliver(
29 &self,
30 event: &SubscriptionEvent,
31 subscription_name: &str,
32 ) -> Result<(), SubscriptionError>;
33
34 /// Get the adapter name for logging/metrics.
35 fn name(&self) -> &'static str;
36
37 /// Check if the adapter is healthy/connected.
38 async fn health_check(&self) -> bool;
39}
40
41/// Multi-transport delivery manager.
42///
43/// Manages multiple transport adapters and delivers events to all configured
44/// destinations in parallel.
45///
46/// # Example
47///
48/// ```ignore
49/// use fraiseql_core::runtime::subscription::{
50/// TransportManager, WebhookAdapter, WebhookConfig,
51/// };
52///
53/// let mut manager = TransportManager::new();
54///
55/// // Add webhook adapter
56/// let webhook = WebhookAdapter::new(WebhookConfig::new("https://api.example.com/events"));
57/// manager.add_adapter(Box::new(webhook));
58///
59/// // Deliver to all transports
60/// manager.deliver_all(&event, "orderCreated").await?;
61/// ```
62pub struct TransportManager {
63 adapters: Vec<Box<dyn TransportAdapter>>,
64}
65
66impl TransportManager {
67 /// Create a new transport manager.
68 #[must_use]
69 pub fn new() -> Self {
70 Self {
71 adapters: Vec::new(),
72 }
73 }
74
75 /// Add a transport adapter.
76 pub fn add_adapter(&mut self, adapter: Box<dyn TransportAdapter>) {
77 tracing::info!(adapter = adapter.name(), "Added transport adapter");
78 self.adapters.push(adapter);
79 }
80
81 /// Get the number of configured adapters.
82 #[must_use]
83 pub fn adapter_count(&self) -> usize {
84 self.adapters.len()
85 }
86
87 /// Check if there are no adapters configured.
88 #[must_use]
89 pub fn is_empty(&self) -> bool {
90 self.adapters.is_empty()
91 }
92
93 /// Deliver an event to all configured transports.
94 ///
95 /// Delivers in parallel and collects results. Returns Ok if at least one
96 /// delivery succeeded, or the last error if all failed.
97 pub async fn deliver_all(
98 &self,
99 event: &SubscriptionEvent,
100 subscription_name: &str,
101 ) -> Result<DeliveryResult, SubscriptionError> {
102 if self.adapters.is_empty() {
103 return Ok(DeliveryResult {
104 successful: 0,
105 failed: 0,
106 errors: Vec::new(),
107 });
108 }
109
110 let futures: Vec<_> = self
111 .adapters
112 .iter()
113 .map(|adapter| {
114 let name = adapter.name().to_string();
115 async move {
116 let result = adapter.deliver(event, subscription_name).await;
117 (name, result)
118 }
119 })
120 .collect();
121
122 let results = futures::future::join_all(futures).await;
123
124 let mut successful = 0;
125 let mut failed = 0;
126 let mut errors = Vec::new();
127
128 for (name, result) in results {
129 match result {
130 Ok(()) => successful += 1,
131 Err(e) => {
132 failed += 1;
133 errors.push((name, e.to_string()));
134 },
135 }
136 }
137
138 Ok(DeliveryResult {
139 successful,
140 failed,
141 errors,
142 })
143 }
144
145 /// Check health of all adapters.
146 pub async fn health_check_all(&self) -> Vec<(String, bool)> {
147 let futures: Vec<_> = self
148 .adapters
149 .iter()
150 .map(|adapter| {
151 let name = adapter.name().to_string();
152 async move {
153 let healthy = adapter.health_check().await;
154 (name, healthy)
155 }
156 })
157 .collect();
158
159 futures::future::join_all(futures).await
160 }
161}
162
163impl Default for TransportManager {
164 fn default() -> Self {
165 Self::new()
166 }
167}
168
169impl std::fmt::Debug for TransportManager {
170 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 f.debug_struct("TransportManager")
172 .field("adapter_count", &self.adapters.len())
173 .finish()
174 }
175}
176
177/// Result of delivering an event to multiple transports.
178#[derive(Debug, Clone)]
179pub struct DeliveryResult {
180 /// Number of successful deliveries.
181 pub successful: usize,
182 /// Number of failed deliveries.
183 pub failed: usize,
184 /// Errors from failed deliveries (adapter name, error message).
185 pub errors: Vec<(String, String)>,
186}
187
188impl DeliveryResult {
189 /// Check if all deliveries succeeded.
190 #[must_use]
191 pub fn all_succeeded(&self) -> bool {
192 self.failed == 0
193 }
194
195 /// Check if at least one delivery succeeded.
196 #[must_use]
197 pub fn any_succeeded(&self) -> bool {
198 self.successful > 0
199 }
200}