Skip to main content

coil_runtime/wasm/host/services/webhooks/
mod.rs

1use std::path::{Path, PathBuf};
2
3use super::super::*;
4
5mod local;
6mod shared;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum WebhookObservationStatus {
10    Accepted,
11    VerificationFailed,
12    ReplayRejected,
13    ExecutionFailed,
14}
15
16impl WebhookObservationStatus {
17    pub const fn as_str(&self) -> &'static str {
18        match self {
19            Self::Accepted => "accepted",
20            Self::VerificationFailed => "verification_failed",
21            Self::ReplayRejected => "replay_rejected",
22            Self::ExecutionFailed => "execution_failed",
23        }
24    }
25
26    pub(crate) fn from_db_value(value: &str) -> Result<Self, String> {
27        match value {
28            "accepted" => Ok(Self::Accepted),
29            "verification_failed" => Ok(Self::VerificationFailed),
30            "replay_rejected" => Ok(Self::ReplayRejected),
31            "execution_failed" => Ok(Self::ExecutionFailed),
32            other => Err(format!("unknown webhook observation status `{other}`")),
33        }
34    }
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
38pub struct WebhookObservationStatusCounts {
39    pub accepted: usize,
40    pub verification_failed: usize,
41    pub replay_rejected: usize,
42    pub execution_failed: usize,
43}
44
45impl WebhookObservationStatusCounts {
46    fn increment(&mut self, status: WebhookObservationStatus, count: usize) {
47        match status {
48            WebhookObservationStatus::Accepted => self.accepted += count,
49            WebhookObservationStatus::VerificationFailed => self.verification_failed += count,
50            WebhookObservationStatus::ReplayRejected => self.replay_rejected += count,
51            WebhookObservationStatus::ExecutionFailed => self.execution_failed += count,
52        }
53    }
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum WebhookObservationBackendKind {
58    LocalSqlite,
59    SharedPostgres,
60}
61
62impl WebhookObservationBackendKind {
63    pub const fn as_str(&self) -> &'static str {
64        match self {
65            Self::LocalSqlite => "local-sqlite",
66            Self::SharedPostgres => "shared-postgres",
67        }
68    }
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct WebhookObservationEvent {
73    pub id: i64,
74    pub recorded_at_unix_seconds: i64,
75    pub app_id: String,
76    pub source: String,
77    pub event: String,
78    pub status: WebhookObservationStatus,
79    pub trace_id: String,
80    pub principal_kind: String,
81    pub principal_id: Option<String>,
82    pub detail: Option<String>,
83}
84
85impl WebhookObservationEvent {
86    fn from_context(
87        source: &str,
88        event: &str,
89        status: WebhookObservationStatus,
90        context: &InvocationContext,
91        detail: Option<String>,
92    ) -> Self {
93        Self {
94            id: 0,
95            recorded_at_unix_seconds: unix_seconds_now(),
96            app_id: context.customer_app.app_id.clone(),
97            source: source.to_string(),
98            event: event.to_string(),
99            status,
100            trace_id: context.trace.trace_id.clone(),
101            principal_kind: context.principal.kind.to_string(),
102            principal_id: context.principal.id.clone(),
103            detail,
104        }
105    }
106
107    fn from_request(
108        app_id: &str,
109        source: &str,
110        event: &str,
111        status: WebhookObservationStatus,
112        request_id: &str,
113        principal_kind: &str,
114        principal_id: Option<&str>,
115        detail: Option<String>,
116    ) -> Self {
117        Self {
118            id: 0,
119            recorded_at_unix_seconds: unix_seconds_now(),
120            app_id: app_id.to_string(),
121            source: source.to_string(),
122            event: event.to_string(),
123            status,
124            trace_id: request_id.to_string(),
125            principal_kind: principal_kind.to_string(),
126            principal_id: principal_id.map(str::to_string),
127            detail,
128        }
129    }
130}
131
132#[derive(Debug, Clone, PartialEq, Eq)]
133pub struct WebhookObservationSnapshot {
134    pub backend: WebhookObservationBackendKind,
135    pub location: String,
136    pub path: Option<PathBuf>,
137    pub entry_count: usize,
138    pub status_counts: WebhookObservationStatusCounts,
139    pub recent_events: Vec<WebhookObservationEvent>,
140}
141
142#[derive(Debug, Clone)]
143pub(super) struct RuntimeWebhookObservationBackend {
144    backend: WebhookObservationBackend,
145}
146
147impl RuntimeWebhookObservationBackend {
148    pub(super) fn open(plan: &RuntimePlan) -> Self {
149        let backend = match plan.metadata_audit_backend_selection() {
150            crate::plan::MetadataAuditBackendSelection::SharedPostgres { runtime } => {
151                WebhookObservationBackend::shared(shared::SharedWebhookObservationStore::open(
152                    runtime,
153                ))
154            }
155            crate::plan::MetadataAuditBackendSelection::LocalSqlite { root, namespace } => {
156                WebhookObservationBackend::local(local::LocalWebhookObservationStore::open(
157                    root, namespace,
158                ))
159            }
160        };
161        Self { backend }
162    }
163
164    #[cfg(test)]
165    pub(super) fn with_local_root(root: impl Into<PathBuf>, namespace: impl Into<String>) -> Self {
166        Self {
167            backend: WebhookObservationBackend::local(local::LocalWebhookObservationStore::open(
168                root.into(),
169                namespace.into(),
170            )),
171        }
172    }
173
174    pub(super) fn record(
175        &self,
176        source: &str,
177        event: &str,
178        status: WebhookObservationStatus,
179        context: &InvocationContext,
180        detail: Option<String>,
181    ) -> Result<(), String> {
182        self.backend.insert(&WebhookObservationEvent::from_context(
183            source, event, status, context, detail,
184        ))
185    }
186
187    pub(super) fn record_request(
188        &self,
189        app_id: &str,
190        source: &str,
191        event: &str,
192        status: WebhookObservationStatus,
193        request_id: &str,
194        principal_kind: &str,
195        principal_id: Option<&str>,
196        detail: Option<String>,
197    ) -> Result<(), String> {
198        self.backend.insert(&WebhookObservationEvent::from_request(
199            app_id,
200            source,
201            event,
202            status,
203            request_id,
204            principal_kind,
205            principal_id,
206            detail,
207        ))
208    }
209
210    pub(super) fn claim_delivery(
211        &self,
212        app_id: &str,
213        route_name: &str,
214        source: &str,
215        delivery_id: &str,
216        request_id: &str,
217        recorded_at_unix_seconds: i64,
218    ) -> Result<bool, String> {
219        self.backend.claim_delivery(
220            app_id,
221            route_name,
222            source,
223            delivery_id,
224            request_id,
225            recorded_at_unix_seconds,
226        )
227    }
228
229    pub(super) fn snapshot(&self, limit: usize) -> Result<WebhookObservationSnapshot, String> {
230        Ok(WebhookObservationSnapshot {
231            backend: self.backend.kind(),
232            location: self.backend.location_label(),
233            path: self.backend.path().map(Path::to_path_buf),
234            entry_count: self.backend.entry_count()?,
235            status_counts: self.backend.status_counts()?,
236            recent_events: self.backend.recent(limit)?,
237        })
238    }
239}
240
241#[derive(Debug, Clone)]
242enum WebhookObservationBackend {
243    Local(local::LocalWebhookObservationStore),
244    Shared(shared::SharedWebhookObservationStore),
245}
246
247impl WebhookObservationBackend {
248    fn local(store: local::LocalWebhookObservationStore) -> Self {
249        Self::Local(store)
250    }
251
252    fn shared(store: shared::SharedWebhookObservationStore) -> Self {
253        Self::Shared(store)
254    }
255
256    fn kind(&self) -> WebhookObservationBackendKind {
257        match self {
258            Self::Local(_) => WebhookObservationBackendKind::LocalSqlite,
259            Self::Shared(_) => WebhookObservationBackendKind::SharedPostgres,
260        }
261    }
262
263    fn location_label(&self) -> String {
264        match self {
265            Self::Local(store) => store.location_label(),
266            Self::Shared(store) => store.location_label(),
267        }
268    }
269
270    fn path(&self) -> Option<&Path> {
271        match self {
272            Self::Local(store) => Some(store.path()),
273            Self::Shared(_) => None,
274        }
275    }
276
277    fn insert(&self, record: &WebhookObservationEvent) -> Result<(), String> {
278        match self {
279            Self::Local(store) => store.insert(record),
280            Self::Shared(store) => store.insert(record),
281        }
282    }
283
284    fn entry_count(&self) -> Result<usize, String> {
285        match self {
286            Self::Local(store) => store.count(),
287            Self::Shared(store) => store.count(),
288        }
289    }
290
291    fn status_counts(&self) -> Result<WebhookObservationStatusCounts, String> {
292        match self {
293            Self::Local(store) => store.status_counts(),
294            Self::Shared(store) => store.status_counts(),
295        }
296    }
297
298    fn recent(&self, limit: usize) -> Result<Vec<WebhookObservationEvent>, String> {
299        match self {
300            Self::Local(store) => store.recent(limit),
301            Self::Shared(store) => store.recent(limit),
302        }
303    }
304
305    fn claim_delivery(
306        &self,
307        app_id: &str,
308        route_name: &str,
309        source: &str,
310        delivery_id: &str,
311        request_id: &str,
312        recorded_at_unix_seconds: i64,
313    ) -> Result<bool, String> {
314        match self {
315            Self::Local(store) => store.claim_delivery(
316                app_id,
317                route_name,
318                source,
319                delivery_id,
320                request_id,
321                recorded_at_unix_seconds,
322            ),
323            Self::Shared(store) => store.claim_delivery(
324                app_id,
325                route_name,
326                source,
327                delivery_id,
328                request_id,
329                recorded_at_unix_seconds,
330            ),
331        }
332    }
333}
334
335pub(crate) fn decode_status_counts(
336    rows: impl IntoIterator<Item = (String, i64)>,
337) -> Result<WebhookObservationStatusCounts, String> {
338    let mut counts = WebhookObservationStatusCounts::default();
339    for (status, count) in rows {
340        let count = usize::try_from(count)
341            .map_err(|_| "webhook observation count overflowed usize".to_string())?;
342        counts.increment(WebhookObservationStatus::from_db_value(&status)?, count);
343    }
344    Ok(counts)
345}
346
347fn unix_seconds_now() -> i64 {
348    use std::time::{SystemTime, UNIX_EPOCH};
349
350    SystemTime::now()
351        .duration_since(UNIX_EPOCH)
352        .unwrap_or_default()
353        .as_secs() as i64
354}