1use std::path::{Path, PathBuf};
2
3use super::super::*;
4
5mod local;
6mod shared;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum WebhookObservationStatus {
10 Accepted,
11 VerificationFailed,
12 ReplayRejected,
13 ExecutionFailed,
14}
15
16impl WebhookObservationStatus {
17 pub const fn as_str(&self) -> &'static str {
18 match self {
19 Self::Accepted => "accepted",
20 Self::VerificationFailed => "verification_failed",
21 Self::ReplayRejected => "replay_rejected",
22 Self::ExecutionFailed => "execution_failed",
23 }
24 }
25
26 pub(crate) fn from_db_value(value: &str) -> Result<Self, String> {
27 match value {
28 "accepted" => Ok(Self::Accepted),
29 "verification_failed" => Ok(Self::VerificationFailed),
30 "replay_rejected" => Ok(Self::ReplayRejected),
31 "execution_failed" => Ok(Self::ExecutionFailed),
32 other => Err(format!("unknown webhook observation status `{other}`")),
33 }
34 }
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
38pub struct WebhookObservationStatusCounts {
39 pub accepted: usize,
40 pub verification_failed: usize,
41 pub replay_rejected: usize,
42 pub execution_failed: usize,
43}
44
45impl WebhookObservationStatusCounts {
46 fn increment(&mut self, status: WebhookObservationStatus, count: usize) {
47 match status {
48 WebhookObservationStatus::Accepted => self.accepted += count,
49 WebhookObservationStatus::VerificationFailed => self.verification_failed += count,
50 WebhookObservationStatus::ReplayRejected => self.replay_rejected += count,
51 WebhookObservationStatus::ExecutionFailed => self.execution_failed += count,
52 }
53 }
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum WebhookObservationBackendKind {
58 LocalSqlite,
59 SharedPostgres,
60}
61
62impl WebhookObservationBackendKind {
63 pub const fn as_str(&self) -> &'static str {
64 match self {
65 Self::LocalSqlite => "local-sqlite",
66 Self::SharedPostgres => "shared-postgres",
67 }
68 }
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct WebhookObservationEvent {
73 pub id: i64,
74 pub recorded_at_unix_seconds: i64,
75 pub app_id: String,
76 pub source: String,
77 pub event: String,
78 pub status: WebhookObservationStatus,
79 pub trace_id: String,
80 pub principal_kind: String,
81 pub principal_id: Option<String>,
82 pub detail: Option<String>,
83}
84
85impl WebhookObservationEvent {
86 fn from_context(
87 source: &str,
88 event: &str,
89 status: WebhookObservationStatus,
90 context: &InvocationContext,
91 detail: Option<String>,
92 ) -> Self {
93 Self {
94 id: 0,
95 recorded_at_unix_seconds: unix_seconds_now(),
96 app_id: context.customer_app.app_id.clone(),
97 source: source.to_string(),
98 event: event.to_string(),
99 status,
100 trace_id: context.trace.trace_id.clone(),
101 principal_kind: context.principal.kind.to_string(),
102 principal_id: context.principal.id.clone(),
103 detail,
104 }
105 }
106
107 fn from_request(
108 app_id: &str,
109 source: &str,
110 event: &str,
111 status: WebhookObservationStatus,
112 request_id: &str,
113 principal_kind: &str,
114 principal_id: Option<&str>,
115 detail: Option<String>,
116 ) -> Self {
117 Self {
118 id: 0,
119 recorded_at_unix_seconds: unix_seconds_now(),
120 app_id: app_id.to_string(),
121 source: source.to_string(),
122 event: event.to_string(),
123 status,
124 trace_id: request_id.to_string(),
125 principal_kind: principal_kind.to_string(),
126 principal_id: principal_id.map(str::to_string),
127 detail,
128 }
129 }
130}
131
132#[derive(Debug, Clone, PartialEq, Eq)]
133pub struct WebhookObservationSnapshot {
134 pub backend: WebhookObservationBackendKind,
135 pub location: String,
136 pub path: Option<PathBuf>,
137 pub entry_count: usize,
138 pub status_counts: WebhookObservationStatusCounts,
139 pub recent_events: Vec<WebhookObservationEvent>,
140}
141
142#[derive(Debug, Clone)]
143pub(super) struct RuntimeWebhookObservationBackend {
144 backend: WebhookObservationBackend,
145}
146
147impl RuntimeWebhookObservationBackend {
148 pub(super) fn open(plan: &RuntimePlan) -> Self {
149 let backend = match plan.metadata_audit_backend_selection() {
150 crate::plan::MetadataAuditBackendSelection::SharedPostgres { runtime } => {
151 WebhookObservationBackend::shared(shared::SharedWebhookObservationStore::open(
152 runtime,
153 ))
154 }
155 crate::plan::MetadataAuditBackendSelection::LocalSqlite { root, namespace } => {
156 WebhookObservationBackend::local(local::LocalWebhookObservationStore::open(
157 root, namespace,
158 ))
159 }
160 };
161 Self { backend }
162 }
163
164 #[cfg(test)]
165 pub(super) fn with_local_root(root: impl Into<PathBuf>, namespace: impl Into<String>) -> Self {
166 Self {
167 backend: WebhookObservationBackend::local(local::LocalWebhookObservationStore::open(
168 root.into(),
169 namespace.into(),
170 )),
171 }
172 }
173
174 pub(super) fn record(
175 &self,
176 source: &str,
177 event: &str,
178 status: WebhookObservationStatus,
179 context: &InvocationContext,
180 detail: Option<String>,
181 ) -> Result<(), String> {
182 self.backend.insert(&WebhookObservationEvent::from_context(
183 source, event, status, context, detail,
184 ))
185 }
186
187 pub(super) fn record_request(
188 &self,
189 app_id: &str,
190 source: &str,
191 event: &str,
192 status: WebhookObservationStatus,
193 request_id: &str,
194 principal_kind: &str,
195 principal_id: Option<&str>,
196 detail: Option<String>,
197 ) -> Result<(), String> {
198 self.backend.insert(&WebhookObservationEvent::from_request(
199 app_id,
200 source,
201 event,
202 status,
203 request_id,
204 principal_kind,
205 principal_id,
206 detail,
207 ))
208 }
209
210 pub(super) fn claim_delivery(
211 &self,
212 app_id: &str,
213 route_name: &str,
214 source: &str,
215 delivery_id: &str,
216 request_id: &str,
217 recorded_at_unix_seconds: i64,
218 ) -> Result<bool, String> {
219 self.backend.claim_delivery(
220 app_id,
221 route_name,
222 source,
223 delivery_id,
224 request_id,
225 recorded_at_unix_seconds,
226 )
227 }
228
229 pub(super) fn snapshot(&self, limit: usize) -> Result<WebhookObservationSnapshot, String> {
230 Ok(WebhookObservationSnapshot {
231 backend: self.backend.kind(),
232 location: self.backend.location_label(),
233 path: self.backend.path().map(Path::to_path_buf),
234 entry_count: self.backend.entry_count()?,
235 status_counts: self.backend.status_counts()?,
236 recent_events: self.backend.recent(limit)?,
237 })
238 }
239}
240
241#[derive(Debug, Clone)]
242enum WebhookObservationBackend {
243 Local(local::LocalWebhookObservationStore),
244 Shared(shared::SharedWebhookObservationStore),
245}
246
247impl WebhookObservationBackend {
248 fn local(store: local::LocalWebhookObservationStore) -> Self {
249 Self::Local(store)
250 }
251
252 fn shared(store: shared::SharedWebhookObservationStore) -> Self {
253 Self::Shared(store)
254 }
255
256 fn kind(&self) -> WebhookObservationBackendKind {
257 match self {
258 Self::Local(_) => WebhookObservationBackendKind::LocalSqlite,
259 Self::Shared(_) => WebhookObservationBackendKind::SharedPostgres,
260 }
261 }
262
263 fn location_label(&self) -> String {
264 match self {
265 Self::Local(store) => store.location_label(),
266 Self::Shared(store) => store.location_label(),
267 }
268 }
269
270 fn path(&self) -> Option<&Path> {
271 match self {
272 Self::Local(store) => Some(store.path()),
273 Self::Shared(_) => None,
274 }
275 }
276
277 fn insert(&self, record: &WebhookObservationEvent) -> Result<(), String> {
278 match self {
279 Self::Local(store) => store.insert(record),
280 Self::Shared(store) => store.insert(record),
281 }
282 }
283
284 fn entry_count(&self) -> Result<usize, String> {
285 match self {
286 Self::Local(store) => store.count(),
287 Self::Shared(store) => store.count(),
288 }
289 }
290
291 fn status_counts(&self) -> Result<WebhookObservationStatusCounts, String> {
292 match self {
293 Self::Local(store) => store.status_counts(),
294 Self::Shared(store) => store.status_counts(),
295 }
296 }
297
298 fn recent(&self, limit: usize) -> Result<Vec<WebhookObservationEvent>, String> {
299 match self {
300 Self::Local(store) => store.recent(limit),
301 Self::Shared(store) => store.recent(limit),
302 }
303 }
304
305 fn claim_delivery(
306 &self,
307 app_id: &str,
308 route_name: &str,
309 source: &str,
310 delivery_id: &str,
311 request_id: &str,
312 recorded_at_unix_seconds: i64,
313 ) -> Result<bool, String> {
314 match self {
315 Self::Local(store) => store.claim_delivery(
316 app_id,
317 route_name,
318 source,
319 delivery_id,
320 request_id,
321 recorded_at_unix_seconds,
322 ),
323 Self::Shared(store) => store.claim_delivery(
324 app_id,
325 route_name,
326 source,
327 delivery_id,
328 request_id,
329 recorded_at_unix_seconds,
330 ),
331 }
332 }
333}
334
335pub(crate) fn decode_status_counts(
336 rows: impl IntoIterator<Item = (String, i64)>,
337) -> Result<WebhookObservationStatusCounts, String> {
338 let mut counts = WebhookObservationStatusCounts::default();
339 for (status, count) in rows {
340 let count = usize::try_from(count)
341 .map_err(|_| "webhook observation count overflowed usize".to_string())?;
342 counts.increment(WebhookObservationStatus::from_db_value(&status)?, count);
343 }
344 Ok(counts)
345}
346
347fn unix_seconds_now() -> i64 {
348 use std::time::{SystemTime, UNIX_EPOCH};
349
350 SystemTime::now()
351 .duration_since(UNIX_EPOCH)
352 .unwrap_or_default()
353 .as_secs() as i64
354}