1#![cfg_attr(not(feature = "unstable-engine"), allow(dead_code))]
7
8use bytes::Bytes;
9
10use crate::{
11 delete_ops::{self, DeleteRequest},
12 engine::{Engine, EngineError},
13 engine_ops::EngineOps,
14 engine_types::{AccessTier, Preconditions, Representation, ValidatedWorldPath, WriteResult},
15 world_ops,
16};
17
18pub trait EngineWriteTraceHooks {
24 fn lock_acquired(&self) {}
26 fn quota_check(&self, _used: usize, _quota: usize) {}
29 fn sqlite_committed(&self, _etag: &str) {}
31 fn notify_sent(&self) {}
33}
34
35pub trait EngineDeleteTraceHooks {
41 fn lock_acquired(&self, _world: &str) {}
43 fn audit_intent(&self) {}
45 fn audit_intent_failed(&self, _err: &str) {}
50 fn read_cache_drained(&self) {}
52 fn physical_deleted(&self) {}
54 fn counter_decremented(&self) {}
56 fn notify_sent(&self) {}
58 fn audit_commit_failed(&self, _err: &str) {}
63 fn audit_commit_failed_event_logged(&self) {}
66 fn audit_commit_failed_event_failed(&self, _err: &str) {}
71 fn audit_commit(&self) {}
73}
74
75#[derive(Clone, Default)]
83#[non_exhaustive]
84pub struct DeleteMetadata {
85 pub content_type: String,
87 pub headers: Vec<(String, String)>,
89}
90
91impl DeleteMetadata {
92 pub fn new(content_type: impl Into<String>, headers: Vec<(String, String)>) -> Self {
94 Self {
95 content_type: content_type.into(),
96 headers,
97 }
98 }
99}
100
101struct PublicWriteTrace<'a, H: ?Sized>(&'a H);
102
103impl<H: EngineWriteTraceHooks + ?Sized> world_ops::WriteTraceHooks for PublicWriteTrace<'_, H> {
104 fn lock_acquired(&self) {
105 self.0.lock_acquired();
106 }
107
108 fn quota_check(&self, used: usize, quota: usize) {
109 self.0.quota_check(used, quota);
110 }
111
112 fn sqlite_committed(&self, etag: &str) {
113 self.0.sqlite_committed(etag);
114 }
115
116 fn notify_sent(&self) {
117 self.0.notify_sent();
118 }
119}
120
121struct PublicDeleteTrace<'a, H: ?Sized>(&'a H);
122
123impl<H: EngineDeleteTraceHooks + ?Sized> delete_ops::DeleteTraceHooks for PublicDeleteTrace<'_, H> {
124 fn lock_acquired(&self, world: &str) {
125 self.0.lock_acquired(world);
126 }
127
128 fn audit_intent(&self) {
129 self.0.audit_intent();
130 }
131
132 fn read_cache_drained(&self) {
133 self.0.read_cache_drained();
134 }
135
136 fn physical_deleted(&self) {
137 self.0.physical_deleted();
138 }
139
140 fn counter_decremented(&self) {
141 self.0.counter_decremented();
142 }
143
144 fn notify_sent(&self) {
145 self.0.notify_sent();
146 }
147
148 fn audit_commit_failed(&self, err: &crate::BlockingSqliteError) {
149 self.0.audit_commit_failed(&format!("{err:?}"));
150 }
151
152 fn audit_commit_failed_event_logged(&self) {
153 self.0.audit_commit_failed_event_logged();
154 }
155
156 fn audit_commit_failed_event_failed(&self, err: &crate::BlockingSqliteError) {
157 self.0.audit_commit_failed_event_failed(&format!("{err:?}"));
158 }
159
160 fn audit_commit(&self) {
161 self.0.audit_commit();
162 }
163}
164
165impl Engine {
166 pub async fn replace_traced<H: EngineWriteTraceHooks + ?Sized>(
175 &self,
176 world: &ValidatedWorldPath,
177 representation: Representation,
178 preconditions: Preconditions,
179 tier: AccessTier,
180 hooks: &H,
181 ) -> Result<WriteResult, EngineError> {
182 EngineOps::new(self.core())
183 .replace(
184 world,
185 representation,
186 preconditions,
187 tier.into(),
188 &PublicWriteTrace(hooks),
189 )
190 .await
191 }
192
193 pub async fn append_traced<H: EngineWriteTraceHooks + ?Sized>(
199 &self,
200 world: &ValidatedWorldPath,
201 body: Bytes,
202 preconditions: Preconditions,
203 tier: AccessTier,
204 hooks: &H,
205 ) -> Result<WriteResult, EngineError> {
206 EngineOps::new(self.core())
207 .append(
208 world,
209 body,
210 preconditions,
211 tier.into(),
212 &PublicWriteTrace(hooks),
213 )
214 .await
215 }
216
217 pub async fn delete_traced<H: EngineDeleteTraceHooks + ?Sized>(
232 &self,
233 world: &ValidatedWorldPath,
234 metadata: DeleteMetadata,
235 preconditions: Preconditions,
236 tier: AccessTier,
237 hooks: &H,
238 ) -> Result<(), EngineError> {
239 let result = EngineOps::new(self.core())
240 .delete(
241 world,
242 DeleteRequest {
243 preconditions,
244 content_type: metadata.content_type,
245 headers: metadata.headers,
246 },
247 tier.into(),
248 &PublicDeleteTrace(hooks),
249 )
250 .await;
251 if let Err(delete_ops::DeleteError::AuditIntent { err, .. }) = &result {
252 hooks.audit_intent_failed(&format!("{err:?}"));
253 }
254 result.map_err(Into::into)
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261 use std::sync::Mutex;
262
263 struct Spy(Mutex<Vec<String>>);
264
265 impl Spy {
266 fn new() -> Self {
267 Self(Mutex::new(Vec::new()))
268 }
269
270 fn push(&self, value: impl Into<String>) {
271 self.0.lock().unwrap().push(value.into());
272 }
273
274 fn take(&self) -> Vec<String> {
275 std::mem::take(&mut *self.0.lock().unwrap())
276 }
277 }
278
279 impl EngineWriteTraceHooks for Spy {
280 fn lock_acquired(&self) {
281 self.push("lock_acquired");
282 }
283
284 fn quota_check(&self, used: usize, quota: usize) {
285 self.push(format!("quota_check:{used}:{quota}"));
286 }
287
288 fn sqlite_committed(&self, etag: &str) {
289 self.push(format!("sqlite_committed:{etag}"));
290 }
291
292 fn notify_sent(&self) {
293 self.push("notify_sent");
294 }
295 }
296
297 impl EngineDeleteTraceHooks for Spy {
298 fn lock_acquired(&self, world: &str) {
299 self.push(format!("lock_acquired:{world}"));
300 }
301
302 fn audit_commit_failed(&self, err: &str) {
303 self.push(format!("audit_commit_failed:{err}"));
304 }
305 }
306
307 #[test]
308 fn public_write_trace_forwards_internal_hook_boundaries() {
309 let spy = Spy::new();
310 let trace = PublicWriteTrace(&spy);
311
312 world_ops::WriteTraceHooks::lock_acquired(&trace);
313 world_ops::WriteTraceHooks::quota_check(&trace, 3, 5);
314 world_ops::WriteTraceHooks::sqlite_committed(&trace, "etag123");
315 world_ops::WriteTraceHooks::notify_sent(&trace);
316
317 assert_eq!(
318 spy.take(),
319 [
320 "lock_acquired",
321 "quota_check:3:5",
322 "sqlite_committed:etag123",
323 "notify_sent"
324 ]
325 );
326 }
327
328 #[test]
329 fn public_delete_trace_forwards_without_exposing_internal_error_type() {
330 let spy = Spy::new();
331 let trace = PublicDeleteTrace(&spy);
332
333 delete_ops::DeleteTraceHooks::lock_acquired(&trace, "home/task");
334 delete_ops::DeleteTraceHooks::audit_commit_failed(
335 &trace,
336 &crate::BlockingSqliteError::Worker,
337 );
338
339 assert_eq!(
340 spy.take(),
341 ["lock_acquired:home/task", "audit_commit_failed:Worker"]
342 );
343 }
344}