1use crate::{models::*, Result};
4use sqlx::{sqlite::SqlitePool, Pool, Sqlite};
5use std::{collections::HashMap, path::Path};
6use tracing::{debug, info};
7
8#[derive(Clone)]
10pub struct RecorderDatabase {
11 pool: Pool<Sqlite>,
12}
13
14impl RecorderDatabase {
15 pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
17 let db_url = format!("sqlite:{}?mode=rwc", path.as_ref().display());
18 let pool = SqlitePool::connect(&db_url).await?;
19
20 let db = Self { pool };
21 db.initialize_schema().await?;
22
23 info!("Recorder database initialized at {:?}", path.as_ref());
24 Ok(db)
25 }
26
27 pub async fn new_in_memory() -> Result<Self> {
29 let pool = SqlitePool::connect("sqlite::memory:").await?;
30
31 let db = Self { pool };
32 db.initialize_schema().await?;
33
34 debug!("In-memory recorder database initialized");
35 Ok(db)
36 }
37
38 async fn initialize_schema(&self) -> Result<()> {
40 sqlx::query(
42 r#"
43 CREATE TABLE IF NOT EXISTS requests (
44 id TEXT PRIMARY KEY,
45 protocol TEXT NOT NULL,
46 timestamp TEXT NOT NULL,
47 method TEXT NOT NULL,
48 path TEXT NOT NULL,
49 query_params TEXT,
50 headers TEXT NOT NULL,
51 body TEXT,
52 body_encoding TEXT NOT NULL DEFAULT 'utf8',
53 client_ip TEXT,
54 trace_id TEXT,
55 span_id TEXT,
56 duration_ms INTEGER,
57 status_code INTEGER,
58 tags TEXT,
59 created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
60 )
61 "#,
62 )
63 .execute(&self.pool)
64 .await?;
65
66 sqlx::query(
68 r#"
69 CREATE TABLE IF NOT EXISTS responses (
70 request_id TEXT PRIMARY KEY,
71 status_code INTEGER NOT NULL,
72 headers TEXT NOT NULL,
73 body TEXT,
74 body_encoding TEXT NOT NULL DEFAULT 'utf8',
75 size_bytes INTEGER NOT NULL,
76 timestamp TEXT NOT NULL,
77 created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
78 FOREIGN KEY (request_id) REFERENCES requests(id) ON DELETE CASCADE
79 )
80 "#,
81 )
82 .execute(&self.pool)
83 .await?;
84
85 sqlx::query(
87 "CREATE INDEX IF NOT EXISTS idx_requests_timestamp ON requests(timestamp DESC)",
88 )
89 .execute(&self.pool)
90 .await?;
91
92 sqlx::query("CREATE INDEX IF NOT EXISTS idx_requests_protocol ON requests(protocol)")
93 .execute(&self.pool)
94 .await?;
95
96 sqlx::query("CREATE INDEX IF NOT EXISTS idx_requests_method ON requests(method)")
97 .execute(&self.pool)
98 .await?;
99
100 sqlx::query("CREATE INDEX IF NOT EXISTS idx_requests_path ON requests(path)")
101 .execute(&self.pool)
102 .await?;
103
104 sqlx::query("CREATE INDEX IF NOT EXISTS idx_requests_trace_id ON requests(trace_id)")
105 .execute(&self.pool)
106 .await?;
107
108 sqlx::query("CREATE INDEX IF NOT EXISTS idx_requests_status_code ON requests(status_code)")
109 .execute(&self.pool)
110 .await?;
111
112 sqlx::query(
114 r#"
115 CREATE TABLE IF NOT EXISTS sync_snapshots (
116 id TEXT PRIMARY KEY,
117 endpoint TEXT NOT NULL,
118 method TEXT NOT NULL,
119 sync_cycle_id TEXT NOT NULL,
120 timestamp TEXT NOT NULL,
121 before_status_code INTEGER NOT NULL,
122 after_status_code INTEGER NOT NULL,
123 before_body TEXT NOT NULL,
124 after_body TEXT NOT NULL,
125 before_headers TEXT NOT NULL,
126 after_headers TEXT NOT NULL,
127 response_time_before_ms INTEGER,
128 response_time_after_ms INTEGER,
129 changes_summary TEXT NOT NULL,
130 created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
131 )
132 "#,
133 )
134 .execute(&self.pool)
135 .await?;
136
137 sqlx::query(
139 "CREATE INDEX IF NOT EXISTS idx_sync_snapshots_endpoint ON sync_snapshots(endpoint, method, timestamp DESC)",
140 )
141 .execute(&self.pool)
142 .await?;
143
144 sqlx::query(
145 "CREATE INDEX IF NOT EXISTS idx_sync_snapshots_cycle ON sync_snapshots(sync_cycle_id)",
146 )
147 .execute(&self.pool)
148 .await?;
149
150 sqlx::query(
152 r#"
153 CREATE TABLE IF NOT EXISTS behavioral_sequences (
154 id TEXT PRIMARY KEY,
155 name TEXT NOT NULL,
156 steps TEXT NOT NULL,
157 frequency REAL NOT NULL,
158 confidence REAL NOT NULL,
159 learned_from TEXT,
160 description TEXT,
161 tags TEXT,
162 created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
163 )
164 "#,
165 )
166 .execute(&self.pool)
167 .await?;
168
169 sqlx::query(
171 r#"
172 CREATE TABLE IF NOT EXISTS endpoint_probabilities (
173 endpoint TEXT NOT NULL,
174 method TEXT NOT NULL,
175 status_code_distribution TEXT NOT NULL,
176 latency_distribution TEXT NOT NULL,
177 error_patterns TEXT,
178 payload_variations TEXT,
179 sample_count INTEGER NOT NULL,
180 updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
181 PRIMARY KEY (endpoint, method)
182 )
183 "#,
184 )
185 .execute(&self.pool)
186 .await?;
187
188 sqlx::query(
190 r#"
191 CREATE TABLE IF NOT EXISTS edge_case_patterns (
192 id TEXT PRIMARY KEY,
193 endpoint TEXT NOT NULL,
194 method TEXT NOT NULL,
195 pattern_type TEXT NOT NULL,
196 original_probability REAL NOT NULL,
197 amplified_probability REAL,
198 conditions TEXT,
199 sample_responses TEXT,
200 created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
201 )
202 "#,
203 )
204 .execute(&self.pool)
205 .await?;
206
207 sqlx::query(
209 "CREATE INDEX IF NOT EXISTS idx_behavioral_sequences_name ON behavioral_sequences(name)",
210 )
211 .execute(&self.pool)
212 .await?;
213
214 sqlx::query(
215 "CREATE INDEX IF NOT EXISTS idx_endpoint_probabilities_endpoint ON endpoint_probabilities(endpoint, method)",
216 )
217 .execute(&self.pool)
218 .await?;
219
220 sqlx::query(
221 "CREATE INDEX IF NOT EXISTS idx_edge_case_patterns_endpoint ON edge_case_patterns(endpoint, method)",
222 )
223 .execute(&self.pool)
224 .await?;
225
226 sqlx::query(
228 r#"
229 CREATE TABLE IF NOT EXISTS flows (
230 id TEXT PRIMARY KEY,
231 name TEXT,
232 description TEXT,
233 created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
234 tags TEXT, -- JSON array
235 metadata TEXT -- JSON object
236 )
237 "#,
238 )
239 .execute(&self.pool)
240 .await?;
241
242 sqlx::query(
244 r#"
245 CREATE TABLE IF NOT EXISTS flow_steps (
246 flow_id TEXT NOT NULL,
247 step_index INTEGER NOT NULL,
248 request_id TEXT NOT NULL,
249 step_label TEXT, -- e.g., "login", "list", "checkout"
250 timing_ms INTEGER, -- delay from previous step
251 FOREIGN KEY (flow_id) REFERENCES flows(id) ON DELETE CASCADE,
252 FOREIGN KEY (request_id) REFERENCES requests(id) ON DELETE CASCADE,
253 PRIMARY KEY (flow_id, step_index)
254 )
255 "#,
256 )
257 .execute(&self.pool)
258 .await?;
259
260 sqlx::query(
262 r#"
263 CREATE TABLE IF NOT EXISTS scenarios (
264 id TEXT PRIMARY KEY,
265 name TEXT NOT NULL,
266 version TEXT NOT NULL,
267 description TEXT,
268 scenario_data TEXT NOT NULL, -- JSON serialized BehavioralScenario
269 metadata TEXT, -- JSON object
270 created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
271 updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
272 tags TEXT, -- JSON array
273 UNIQUE(name, version)
274 )
275 "#,
276 )
277 .execute(&self.pool)
278 .await?;
279
280 sqlx::query("CREATE INDEX IF NOT EXISTS idx_flows_created_at ON flows(created_at DESC)")
282 .execute(&self.pool)
283 .await?;
284
285 sqlx::query("CREATE INDEX IF NOT EXISTS idx_flow_steps_flow_id ON flow_steps(flow_id)")
286 .execute(&self.pool)
287 .await?;
288
289 sqlx::query("CREATE INDEX IF NOT EXISTS idx_scenarios_name ON scenarios(name, version)")
290 .execute(&self.pool)
291 .await?;
292
293 debug!("Database schema initialized");
294 Ok(())
295 }
296
297 pub async fn insert_request(&self, request: &RecordedRequest) -> Result<()> {
299 sqlx::query(
300 r#"
301 INSERT INTO requests (
302 id, protocol, timestamp, method, path, query_params,
303 headers, body, body_encoding, client_ip, trace_id, span_id,
304 duration_ms, status_code, tags
305 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
306 "#,
307 )
308 .bind(&request.id)
309 .bind(request.protocol)
310 .bind(request.timestamp)
311 .bind(&request.method)
312 .bind(&request.path)
313 .bind(&request.query_params)
314 .bind(&request.headers)
315 .bind(&request.body)
316 .bind(&request.body_encoding)
317 .bind(&request.client_ip)
318 .bind(&request.trace_id)
319 .bind(&request.span_id)
320 .bind(request.duration_ms)
321 .bind(request.status_code)
322 .bind(&request.tags)
323 .execute(&self.pool)
324 .await?;
325
326 debug!("Recorded request: {} {} {}", request.protocol, request.method, request.path);
327 Ok(())
328 }
329
330 pub async fn insert_response(&self, response: &RecordedResponse) -> Result<()> {
332 sqlx::query(
333 r#"
334 INSERT INTO responses (
335 request_id, status_code, headers, body, body_encoding,
336 size_bytes, timestamp
337 ) VALUES (?, ?, ?, ?, ?, ?, ?)
338 "#,
339 )
340 .bind(&response.request_id)
341 .bind(response.status_code)
342 .bind(&response.headers)
343 .bind(&response.body)
344 .bind(&response.body_encoding)
345 .bind(response.size_bytes)
346 .bind(response.timestamp)
347 .execute(&self.pool)
348 .await?;
349
350 debug!("Recorded response for request: {}", response.request_id);
351 Ok(())
352 }
353
354 pub async fn get_request(&self, id: &str) -> Result<Option<RecordedRequest>> {
356 let request = sqlx::query_as::<_, RecordedRequest>(
357 r#"
358 SELECT id, protocol, timestamp, method, path, query_params,
359 headers, body, body_encoding, client_ip, trace_id, span_id,
360 duration_ms, status_code, tags
361 FROM requests WHERE id = ?
362 "#,
363 )
364 .bind(id)
365 .fetch_optional(&self.pool)
366 .await?;
367
368 Ok(request)
369 }
370
371 pub async fn get_response(&self, request_id: &str) -> Result<Option<RecordedResponse>> {
373 let response = sqlx::query_as::<_, RecordedResponse>(
374 r#"
375 SELECT request_id, status_code, headers, body, body_encoding,
376 size_bytes, timestamp
377 FROM responses WHERE request_id = ?
378 "#,
379 )
380 .bind(request_id)
381 .fetch_optional(&self.pool)
382 .await?;
383
384 Ok(response)
385 }
386
387 pub async fn get_exchange(&self, id: &str) -> Result<Option<RecordedExchange>> {
389 let request = self.get_request(id).await?;
390 if let Some(request) = request {
391 let response = self.get_response(id).await?;
392 Ok(Some(RecordedExchange { request, response }))
393 } else {
394 Ok(None)
395 }
396 }
397
398 pub async fn update_response(
400 &self,
401 request_id: &str,
402 status_code: i32,
403 headers: &str,
404 body: &str,
405 size_bytes: i64,
406 ) -> Result<()> {
407 sqlx::query(
408 r#"
409 UPDATE responses
410 SET status_code = ?,
411 headers = ?,
412 body = ?,
413 body_encoding = 'base64',
414 size_bytes = ?,
415 timestamp = datetime('now')
416 WHERE request_id = ?
417 "#,
418 )
419 .bind(status_code)
420 .bind(headers)
421 .bind(body)
422 .bind(size_bytes)
423 .bind(request_id)
424 .execute(&self.pool)
425 .await?;
426
427 debug!("Updated response for request {}", request_id);
428 Ok(())
429 }
430
431 pub async fn list_recent(&self, limit: i32) -> Result<Vec<RecordedRequest>> {
433 let requests = sqlx::query_as::<_, RecordedRequest>(
434 r#"
435 SELECT id, protocol, timestamp, method, path, query_params,
436 headers, body, body_encoding, client_ip, trace_id, span_id,
437 duration_ms, status_code, tags
438 FROM requests
439 ORDER BY timestamp DESC
440 LIMIT ?
441 "#,
442 )
443 .bind(limit)
444 .fetch_all(&self.pool)
445 .await?;
446
447 Ok(requests)
448 }
449
450 pub async fn delete_older_than(&self, days: i64) -> Result<u64> {
452 let result = sqlx::query(
453 r#"
454 DELETE FROM requests
455 WHERE timestamp < datetime('now', ? || ' days')
456 "#,
457 )
458 .bind(format!("-{}", days))
459 .execute(&self.pool)
460 .await?;
461
462 info!("Deleted {} old requests", result.rows_affected());
463 Ok(result.rows_affected())
464 }
465
466 pub async fn get_stats(&self) -> Result<DatabaseStats> {
468 let total_requests: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM requests")
469 .fetch_one(&self.pool)
470 .await?;
471
472 let total_responses: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM responses")
473 .fetch_one(&self.pool)
474 .await?;
475
476 let total_size: i64 =
477 sqlx::query_scalar("SELECT COALESCE(SUM(size_bytes), 0) FROM responses")
478 .fetch_one(&self.pool)
479 .await?;
480
481 Ok(DatabaseStats {
482 total_requests,
483 total_responses,
484 total_size_bytes: total_size,
485 })
486 }
487
488 pub async fn get_statistics(&self) -> Result<DetailedStats> {
490 let total_requests: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM requests")
491 .fetch_one(&self.pool)
492 .await?;
493
494 let protocol_rows: Vec<(String, i64)> =
496 sqlx::query_as("SELECT protocol, COUNT(*) as count FROM requests GROUP BY protocol")
497 .fetch_all(&self.pool)
498 .await?;
499
500 let by_protocol: HashMap<String, i64> = protocol_rows.into_iter().collect();
501
502 let status_rows: Vec<(i32, i64)> = sqlx::query_as(
504 "SELECT status_code, COUNT(*) as count FROM requests WHERE status_code IS NOT NULL GROUP BY status_code"
505 )
506 .fetch_all(&self.pool)
507 .await?;
508
509 let by_status_code: HashMap<i32, i64> = status_rows.into_iter().collect();
510
511 let avg_duration: Option<f64> = sqlx::query_scalar(
513 "SELECT AVG(duration_ms) FROM requests WHERE duration_ms IS NOT NULL",
514 )
515 .fetch_one(&self.pool)
516 .await?;
517
518 Ok(DetailedStats {
519 total_requests,
520 by_protocol,
521 by_status_code,
522 avg_duration_ms: avg_duration,
523 })
524 }
525
526 pub async fn clear_all(&self) -> Result<()> {
528 sqlx::query("DELETE FROM responses").execute(&self.pool).await?;
529 sqlx::query("DELETE FROM requests").execute(&self.pool).await?;
530 info!("Cleared all recordings");
531 Ok(())
532 }
533
534 pub async fn close(self) {
536 self.pool.close().await;
537 debug!("Recorder database connection closed");
538 }
539
540 pub async fn insert_sync_snapshot(
542 &self,
543 snapshot: &crate::sync_snapshots::SyncSnapshot,
544 ) -> Result<()> {
545 let before_headers_json = serde_json::to_string(&snapshot.before.headers)?;
546 let after_headers_json = serde_json::to_string(&snapshot.after.headers)?;
547 let before_body_encoded = base64::Engine::encode(
548 &base64::engine::general_purpose::STANDARD,
549 &snapshot.before.body,
550 );
551 let after_body_encoded = base64::Engine::encode(
552 &base64::engine::general_purpose::STANDARD,
553 &snapshot.after.body,
554 );
555 let changes_summary_json = serde_json::to_string(&snapshot.changes)?;
556
557 sqlx::query(
558 r#"
559 INSERT INTO sync_snapshots (
560 id, endpoint, method, sync_cycle_id, timestamp,
561 before_status_code, after_status_code,
562 before_body, after_body,
563 before_headers, after_headers,
564 response_time_before_ms, response_time_after_ms,
565 changes_summary
566 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
567 "#,
568 )
569 .bind(&snapshot.id)
570 .bind(&snapshot.endpoint)
571 .bind(&snapshot.method)
572 .bind(&snapshot.sync_cycle_id)
573 .bind(snapshot.timestamp.to_rfc3339())
574 .bind(snapshot.before.status_code as i32)
575 .bind(snapshot.after.status_code as i32)
576 .bind(&before_body_encoded)
577 .bind(&after_body_encoded)
578 .bind(&before_headers_json)
579 .bind(&after_headers_json)
580 .bind(snapshot.response_time_before.map(|v| v as i64))
581 .bind(snapshot.response_time_after.map(|v| v as i64))
582 .bind(&changes_summary_json)
583 .execute(&self.pool)
584 .await?;
585
586 debug!(
587 "Inserted sync snapshot: {} for {} {}",
588 snapshot.id, snapshot.method, snapshot.endpoint
589 );
590 Ok(())
591 }
592
593 pub async fn get_snapshots_for_endpoint(
595 &self,
596 endpoint: &str,
597 method: Option<&str>,
598 limit: Option<i32>,
599 ) -> Result<Vec<crate::sync_snapshots::SyncSnapshot>> {
600 let limit = limit.unwrap_or(100);
601
602 let query = if endpoint.is_empty() {
604 sqlx::query_as::<_, SyncSnapshotRow>(
605 r#"
606 SELECT id, endpoint, method, sync_cycle_id, timestamp,
607 before_status_code, after_status_code,
608 before_body, after_body,
609 before_headers, after_headers,
610 response_time_before_ms, response_time_after_ms,
611 changes_summary
612 FROM sync_snapshots
613 ORDER BY timestamp DESC
614 LIMIT ?
615 "#,
616 )
617 .bind(limit)
618 } else if let Some(method) = method {
619 sqlx::query_as::<_, SyncSnapshotRow>(
620 r#"
621 SELECT id, endpoint, method, sync_cycle_id, timestamp,
622 before_status_code, after_status_code,
623 before_body, after_body,
624 before_headers, after_headers,
625 response_time_before_ms, response_time_after_ms,
626 changes_summary
627 FROM sync_snapshots
628 WHERE endpoint = ? AND method = ?
629 ORDER BY timestamp DESC
630 LIMIT ?
631 "#,
632 )
633 .bind(endpoint)
634 .bind(method)
635 .bind(limit)
636 } else {
637 sqlx::query_as::<_, SyncSnapshotRow>(
638 r#"
639 SELECT id, endpoint, method, sync_cycle_id, timestamp,
640 before_status_code, after_status_code,
641 before_body, after_body,
642 before_headers, after_headers,
643 response_time_before_ms, response_time_after_ms,
644 changes_summary
645 FROM sync_snapshots
646 WHERE endpoint = ?
647 ORDER BY timestamp DESC
648 LIMIT ?
649 "#,
650 )
651 .bind(endpoint)
652 .bind(limit)
653 };
654
655 let rows = query.fetch_all(&self.pool).await?;
656
657 let mut snapshots = Vec::new();
658 for row in rows {
659 snapshots.push(row.to_snapshot()?);
660 }
661
662 Ok(snapshots)
663 }
664
665 pub async fn get_snapshots_by_cycle(
667 &self,
668 sync_cycle_id: &str,
669 ) -> Result<Vec<crate::sync_snapshots::SyncSnapshot>> {
670 let rows = sqlx::query_as::<_, SyncSnapshotRow>(
671 r#"
672 SELECT id, endpoint, method, sync_cycle_id, timestamp,
673 before_status_code, after_status_code,
674 before_body, after_body,
675 before_headers, after_headers,
676 response_time_before_ms, response_time_after_ms,
677 changes_summary
678 FROM sync_snapshots
679 WHERE sync_cycle_id = ?
680 ORDER BY timestamp DESC
681 "#,
682 )
683 .bind(sync_cycle_id)
684 .fetch_all(&self.pool)
685 .await?;
686
687 let mut snapshots = Vec::new();
688 for row in rows {
689 snapshots.push(row.to_snapshot()?);
690 }
691
692 Ok(snapshots)
693 }
694
695 pub async fn delete_old_snapshots(&self, keep_per_endpoint: i32) -> Result<u64> {
697 let result = sqlx::query(
700 r#"
701 DELETE FROM sync_snapshots
702 WHERE id NOT IN (
703 SELECT id FROM sync_snapshots
704 ORDER BY timestamp DESC
705 LIMIT (
706 SELECT COUNT(*) FROM (
707 SELECT DISTINCT endpoint || '|' || method FROM sync_snapshots
708 )
709 ) * ?
710 )
711 "#,
712 )
713 .bind(keep_per_endpoint)
714 .execute(&self.pool)
715 .await?;
716
717 info!(
718 "Deleted {} old snapshots (kept {} per endpoint)",
719 result.rows_affected(),
720 keep_per_endpoint
721 );
722 Ok(result.rows_affected())
723 }
724
725 pub async fn insert_behavioral_sequence(
727 &self,
728 sequence: &mockforge_core::behavioral_cloning::BehavioralSequence,
729 ) -> Result<()> {
730 let steps_json = serde_json::to_string(&sequence.steps)?;
731 let learned_from_json = serde_json::to_string(&sequence.learned_from)?;
732 let tags_json = serde_json::to_string(&sequence.tags)?;
733
734 sqlx::query(
735 r#"
736 INSERT OR REPLACE INTO behavioral_sequences (
737 id, name, steps, frequency, confidence, learned_from, description, tags
738 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
739 "#,
740 )
741 .bind(&sequence.id)
742 .bind(&sequence.name)
743 .bind(&steps_json)
744 .bind(sequence.frequency)
745 .bind(sequence.confidence)
746 .bind(&learned_from_json)
747 .bind(&sequence.description)
748 .bind(&tags_json)
749 .execute(&self.pool)
750 .await?;
751
752 debug!("Inserted behavioral sequence: {}", sequence.id);
753 Ok(())
754 }
755
756 pub async fn get_behavioral_sequences(
758 &self,
759 ) -> Result<Vec<mockforge_core::behavioral_cloning::BehavioralSequence>> {
760 let rows = sqlx::query(
761 r#"
762 SELECT id, name, steps, frequency, confidence, learned_from, description, tags
763 FROM behavioral_sequences
764 ORDER BY frequency DESC, confidence DESC
765 "#,
766 )
767 .fetch_all(&self.pool)
768 .await?;
769
770 let mut sequences = Vec::new();
771 for row in rows {
772 use sqlx::Row;
773 let steps_json: String = row.try_get("steps")?;
774 let learned_from_json: String = row.try_get("learned_from")?;
775 let tags_json: String = row.try_get("tags")?;
776
777 sequences.push(mockforge_core::behavioral_cloning::BehavioralSequence {
778 id: row.try_get("id")?,
779 name: row.try_get("name")?,
780 steps: serde_json::from_str(&steps_json)?,
781 frequency: row.try_get("frequency")?,
782 confidence: row.try_get("confidence")?,
783 learned_from: serde_json::from_str(&learned_from_json).unwrap_or_default(),
784 description: row.try_get("description")?,
785 tags: serde_json::from_str(&tags_json).unwrap_or_default(),
786 });
787 }
788
789 Ok(sequences)
790 }
791
792 pub async fn insert_endpoint_probability_model(
794 &self,
795 model: &mockforge_core::behavioral_cloning::EndpointProbabilityModel,
796 ) -> Result<()> {
797 let status_code_dist_json = serde_json::to_string(&model.status_code_distribution)?;
798 let latency_dist_json = serde_json::to_string(&model.latency_distribution)?;
799 let error_patterns_json = serde_json::to_string(&model.error_patterns)?;
800 let payload_variations_json = serde_json::to_string(&model.payload_variations)?;
801
802 sqlx::query(
803 r#"
804 INSERT OR REPLACE INTO endpoint_probabilities (
805 endpoint, method, status_code_distribution, latency_distribution,
806 error_patterns, payload_variations, sample_count, updated_at
807 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
808 "#,
809 )
810 .bind(&model.endpoint)
811 .bind(&model.method)
812 .bind(&status_code_dist_json)
813 .bind(&latency_dist_json)
814 .bind(&error_patterns_json)
815 .bind(&payload_variations_json)
816 .bind(model.sample_count as i64)
817 .bind(model.updated_at)
818 .execute(&self.pool)
819 .await?;
820
821 debug!("Inserted probability model: {} {}", model.method, model.endpoint);
822 Ok(())
823 }
824
825 pub async fn get_endpoint_probability_model(
827 &self,
828 endpoint: &str,
829 method: &str,
830 ) -> Result<Option<mockforge_core::behavioral_cloning::EndpointProbabilityModel>> {
831 let row = sqlx::query(
832 r#"
833 SELECT endpoint, method, status_code_distribution, latency_distribution,
834 error_patterns, payload_variations, sample_count, updated_at
835 FROM endpoint_probabilities
836 WHERE endpoint = ? AND method = ?
837 "#,
838 )
839 .bind(endpoint)
840 .bind(method)
841 .fetch_optional(&self.pool)
842 .await?;
843
844 if let Some(row) = row {
845 use sqlx::Row;
846 let status_code_dist_json: String = row.try_get("status_code_distribution")?;
847 let latency_dist_json: String = row.try_get("latency_distribution")?;
848 let error_patterns_json: String = row.try_get("error_patterns")?;
849 let payload_variations_json: String = row.try_get("payload_variations")?;
850
851 Ok(Some(mockforge_core::behavioral_cloning::EndpointProbabilityModel {
852 endpoint: row.try_get("endpoint")?,
853 method: row.try_get("method")?,
854 status_code_distribution: serde_json::from_str(&status_code_dist_json)?,
855 latency_distribution: serde_json::from_str(&latency_dist_json)?,
856 error_patterns: serde_json::from_str(&error_patterns_json).unwrap_or_default(),
857 payload_variations: serde_json::from_str(&payload_variations_json)
858 .unwrap_or_default(),
859 sample_count: row.try_get::<i64, _>("sample_count")? as u64,
860 updated_at: row.try_get("updated_at")?,
861 original_error_probabilities: None,
862 }))
863 } else {
864 Ok(None)
865 }
866 }
867
868 pub async fn get_all_endpoint_probability_models(
870 &self,
871 ) -> Result<Vec<mockforge_core::behavioral_cloning::EndpointProbabilityModel>> {
872 let rows = sqlx::query(
873 r#"
874 SELECT endpoint, method, status_code_distribution, latency_distribution,
875 error_patterns, payload_variations, sample_count, updated_at
876 FROM endpoint_probabilities
877 ORDER BY sample_count DESC
878 "#,
879 )
880 .fetch_all(&self.pool)
881 .await?;
882
883 let mut models = Vec::new();
884 for row in rows {
885 use sqlx::Row;
886 let status_code_dist_json: String = row.try_get("status_code_distribution")?;
887 let latency_dist_json: String = row.try_get("latency_distribution")?;
888 let error_patterns_json: String = row.try_get("error_patterns")?;
889 let payload_variations_json: String = row.try_get("payload_variations")?;
890
891 models.push(mockforge_core::behavioral_cloning::EndpointProbabilityModel {
892 endpoint: row.try_get("endpoint")?,
893 method: row.try_get("method")?,
894 status_code_distribution: serde_json::from_str(&status_code_dist_json)?,
895 latency_distribution: serde_json::from_str(&latency_dist_json)?,
896 error_patterns: serde_json::from_str(&error_patterns_json).unwrap_or_default(),
897 payload_variations: serde_json::from_str(&payload_variations_json)
898 .unwrap_or_default(),
899 sample_count: row.try_get::<i64, _>("sample_count")? as u64,
900 original_error_probabilities: None,
901 updated_at: row.try_get("updated_at")?,
902 });
903 }
904
905 Ok(models)
906 }
907
908 pub async fn get_requests_by_trace(
910 &self,
911 min_requests_per_trace: Option<i32>,
912 ) -> Result<Vec<(String, Vec<RecordedRequest>)>> {
913 let requests = sqlx::query_as::<_, RecordedRequest>(
915 r#"
916 SELECT id, protocol, timestamp, method, path, query_params,
917 headers, body, body_encoding, client_ip, trace_id, span_id,
918 duration_ms, status_code, tags
919 FROM requests
920 WHERE trace_id IS NOT NULL AND trace_id != ''
921 ORDER BY trace_id, timestamp ASC
922 "#,
923 )
924 .fetch_all(&self.pool)
925 .await?;
926
927 let mut grouped: std::collections::HashMap<String, Vec<RecordedRequest>> =
929 std::collections::HashMap::new();
930 for request in requests {
931 if let Some(trace_id) = &request.trace_id {
932 grouped.entry(trace_id.clone()).or_insert_with(Vec::new).push(request);
933 }
934 }
935
936 let mut result: Vec<(String, Vec<RecordedRequest>)> = grouped
938 .into_iter()
939 .filter(|(_, requests)| {
940 min_requests_per_trace.map_or(true, |min| requests.len() >= min as usize)
941 })
942 .collect();
943
944 result.sort_by_key(|(trace_id, _)| trace_id.clone());
946
947 Ok(result)
948 }
949
950 pub async fn get_exchanges_for_endpoint(
954 &self,
955 endpoint: &str,
956 method: &str,
957 limit: Option<i32>,
958 ) -> Result<Vec<(RecordedRequest, Option<RecordedResponse>)>> {
959 let limit = limit.unwrap_or(10000);
960 let requests = sqlx::query_as::<_, RecordedRequest>(
961 r#"
962 SELECT id, protocol, timestamp, method, path, query_params,
963 headers, body, body_encoding, client_ip, trace_id, span_id,
964 duration_ms, status_code, tags
965 FROM requests
966 WHERE path = ? AND method = ?
967 ORDER BY timestamp DESC
968 LIMIT ?
969 "#,
970 )
971 .bind(endpoint)
972 .bind(method)
973 .bind(limit)
974 .fetch_all(&self.pool)
975 .await?;
976
977 let mut exchanges = Vec::new();
978 for request in requests {
979 let response = self.get_response(&request.id).await?;
980 exchanges.push((request, response));
981 }
982
983 Ok(exchanges)
984 }
985
986 pub async fn create_flow(
988 &self,
989 flow_id: &str,
990 name: Option<&str>,
991 description: Option<&str>,
992 tags: &[String],
993 ) -> Result<()> {
994 let tags_json = serde_json::to_string(tags)?;
995 let metadata_json =
996 serde_json::to_string(&std::collections::HashMap::<String, serde_json::Value>::new())?;
997
998 sqlx::query(
999 r#"
1000 INSERT INTO flows (id, name, description, created_at, tags, metadata)
1001 VALUES (?, ?, ?, ?, ?, ?)
1002 "#,
1003 )
1004 .bind(flow_id)
1005 .bind(name)
1006 .bind(description)
1007 .bind(chrono::Utc::now().to_rfc3339())
1008 .bind(&tags_json)
1009 .bind(&metadata_json)
1010 .execute(&self.pool)
1011 .await?;
1012
1013 debug!("Created flow: {}", flow_id);
1014 Ok(())
1015 }
1016
1017 pub async fn add_flow_step(
1019 &self,
1020 flow_id: &str,
1021 request_id: &str,
1022 step_index: usize,
1023 step_label: Option<&str>,
1024 timing_ms: Option<u64>,
1025 ) -> Result<()> {
1026 sqlx::query(
1027 r#"
1028 INSERT INTO flow_steps (flow_id, step_index, request_id, step_label, timing_ms)
1029 VALUES (?, ?, ?, ?, ?)
1030 "#,
1031 )
1032 .bind(flow_id)
1033 .bind(step_index as i64)
1034 .bind(request_id)
1035 .bind(step_label)
1036 .bind(timing_ms.map(|t| t as i64))
1037 .execute(&self.pool)
1038 .await?;
1039
1040 debug!("Added step {} to flow {}", step_index, flow_id);
1041 Ok(())
1042 }
1043
1044 pub async fn get_flow_step_count(&self, flow_id: &str) -> Result<usize> {
1046 let count: Option<i64> =
1047 sqlx::query_scalar("SELECT COUNT(*) FROM flow_steps WHERE flow_id = ?")
1048 .bind(flow_id)
1049 .fetch_optional(&self.pool)
1050 .await?;
1051
1052 Ok(count.unwrap_or(0) as usize)
1053 }
1054
1055 pub async fn get_flow_steps(&self, flow_id: &str) -> Result<Vec<FlowStepRow>> {
1057 let rows = sqlx::query_as::<_, FlowStepRow>(
1058 r#"
1059 SELECT request_id, step_index, step_label, timing_ms
1060 FROM flow_steps
1061 WHERE flow_id = ?
1062 ORDER BY step_index ASC
1063 "#,
1064 )
1065 .bind(flow_id)
1066 .fetch_all(&self.pool)
1067 .await?;
1068
1069 Ok(rows)
1070 }
1071
1072 pub async fn get_flow_metadata(&self, flow_id: &str) -> Result<Option<FlowMetadataRow>> {
1074 let row = sqlx::query_as::<_, FlowMetadataRow>(
1075 r#"
1076 SELECT id, name, description, created_at, tags, metadata
1077 FROM flows
1078 WHERE id = ?
1079 "#,
1080 )
1081 .bind(flow_id)
1082 .fetch_optional(&self.pool)
1083 .await?;
1084
1085 Ok(row)
1086 }
1087
1088 pub async fn list_flows(&self, limit: Option<i64>) -> Result<Vec<FlowMetadataRow>> {
1090 let limit = limit.unwrap_or(100);
1091 let rows = sqlx::query_as::<_, FlowMetadataRow>(
1092 r#"
1093 SELECT id, name, description, created_at, tags, metadata
1094 FROM flows
1095 ORDER BY created_at DESC
1096 LIMIT ?
1097 "#,
1098 )
1099 .bind(limit)
1100 .fetch_all(&self.pool)
1101 .await?;
1102
1103 Ok(rows)
1104 }
1105
1106 pub async fn update_flow_metadata(
1108 &self,
1109 flow_id: &str,
1110 name: Option<&str>,
1111 description: Option<&str>,
1112 tags: Option<&[String]>,
1113 ) -> Result<()> {
1114 if let Some(tags) = tags {
1115 let tags_json = serde_json::to_string(tags)?;
1116 sqlx::query(
1117 r#"
1118 UPDATE flows
1119 SET name = COALESCE(?, name),
1120 description = COALESCE(?, description),
1121 tags = ?
1122 WHERE id = ?
1123 "#,
1124 )
1125 .bind(name)
1126 .bind(description)
1127 .bind(&tags_json)
1128 .bind(flow_id)
1129 .execute(&self.pool)
1130 .await?;
1131 } else {
1132 sqlx::query(
1133 r#"
1134 UPDATE flows
1135 SET name = COALESCE(?, name),
1136 description = COALESCE(?, description)
1137 WHERE id = ?
1138 "#,
1139 )
1140 .bind(name)
1141 .bind(description)
1142 .bind(flow_id)
1143 .execute(&self.pool)
1144 .await?;
1145 }
1146
1147 info!("Updated flow metadata: {}", flow_id);
1148 Ok(())
1149 }
1150
1151 pub async fn delete_flow(&self, flow_id: &str) -> Result<()> {
1153 sqlx::query("DELETE FROM flows WHERE id = ?")
1154 .bind(flow_id)
1155 .execute(&self.pool)
1156 .await?;
1157
1158 info!("Deleted flow: {}", flow_id);
1159 Ok(())
1160 }
1161
1162 pub async fn store_scenario(
1164 &self,
1165 scenario: &crate::behavioral_cloning::BehavioralScenario,
1166 version: &str,
1167 ) -> Result<()> {
1168 let scenario_json = serde_json::to_string(scenario)?;
1169 let metadata_json = serde_json::to_string(&scenario.metadata)?;
1170 let tags_json = serde_json::to_string(&scenario.tags)?;
1171
1172 sqlx::query(
1173 r#"
1174 INSERT OR REPLACE INTO scenarios (
1175 id, name, version, description, scenario_data, metadata, updated_at, tags
1176 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
1177 "#,
1178 )
1179 .bind(&scenario.id)
1180 .bind(&scenario.name)
1181 .bind(version)
1182 .bind(&scenario.description)
1183 .bind(&scenario_json)
1184 .bind(&metadata_json)
1185 .bind(chrono::Utc::now().to_rfc3339())
1186 .bind(&tags_json)
1187 .execute(&self.pool)
1188 .await?;
1189
1190 debug!("Stored scenario: {} v{}", scenario.id, version);
1191 Ok(())
1192 }
1193
1194 pub async fn get_scenario(
1196 &self,
1197 scenario_id: &str,
1198 ) -> Result<Option<crate::behavioral_cloning::BehavioralScenario>> {
1199 let row = sqlx::query(
1200 r#"
1201 SELECT scenario_data
1202 FROM scenarios
1203 WHERE id = ?
1204 ORDER BY version DESC
1205 LIMIT 1
1206 "#,
1207 )
1208 .bind(scenario_id)
1209 .fetch_optional(&self.pool)
1210 .await?;
1211
1212 if let Some(row) = row {
1213 use sqlx::Row;
1214 let scenario_json: String = row.try_get("scenario_data")?;
1215 let scenario: crate::behavioral_cloning::BehavioralScenario =
1216 serde_json::from_str(&scenario_json)?;
1217 Ok(Some(scenario))
1218 } else {
1219 Ok(None)
1220 }
1221 }
1222
1223 pub async fn get_scenario_by_name_version(
1225 &self,
1226 name: &str,
1227 version: &str,
1228 ) -> Result<Option<crate::behavioral_cloning::BehavioralScenario>> {
1229 let row = sqlx::query(
1230 r#"
1231 SELECT scenario_data
1232 FROM scenarios
1233 WHERE name = ? AND version = ?
1234 "#,
1235 )
1236 .bind(name)
1237 .bind(version)
1238 .fetch_optional(&self.pool)
1239 .await?;
1240
1241 if let Some(row) = row {
1242 use sqlx::Row;
1243 let scenario_json: String = row.try_get("scenario_data")?;
1244 let scenario: crate::behavioral_cloning::BehavioralScenario =
1245 serde_json::from_str(&scenario_json)?;
1246 Ok(Some(scenario))
1247 } else {
1248 Ok(None)
1249 }
1250 }
1251
1252 pub async fn list_scenarios(&self, limit: Option<i64>) -> Result<Vec<ScenarioMetadataRow>> {
1254 let limit = limit.unwrap_or(100);
1255 let rows = sqlx::query_as::<_, ScenarioMetadataRow>(
1256 r#"
1257 SELECT id, name, version, description, created_at, updated_at, tags
1258 FROM scenarios
1259 ORDER BY name, version DESC
1260 LIMIT ?
1261 "#,
1262 )
1263 .bind(limit)
1264 .fetch_all(&self.pool)
1265 .await?;
1266
1267 Ok(rows)
1268 }
1269
1270 pub async fn delete_scenario(&self, scenario_id: &str) -> Result<()> {
1272 sqlx::query("DELETE FROM scenarios WHERE id = ?")
1273 .bind(scenario_id)
1274 .execute(&self.pool)
1275 .await?;
1276
1277 info!("Deleted scenario: {}", scenario_id);
1278 Ok(())
1279 }
1280}
1281
1282#[derive(Debug, Clone, sqlx::FromRow)]
1284pub struct FlowStepRow {
1285 pub request_id: String,
1286 #[sqlx(rename = "step_index")]
1287 pub step_index: i64,
1288 pub step_label: Option<String>,
1289 pub timing_ms: Option<i64>,
1290}
1291
1292#[derive(Debug, Clone, sqlx::FromRow)]
1294pub struct FlowMetadataRow {
1295 pub id: String,
1296 pub name: Option<String>,
1297 pub description: Option<String>,
1298 #[sqlx(rename = "created_at")]
1299 pub created_at: String,
1300 pub tags: String,
1301 pub metadata: String,
1302}
1303
1304#[derive(Debug, Clone, sqlx::FromRow)]
1306pub struct ScenarioMetadataRow {
1307 pub id: String,
1308 pub name: String,
1309 pub version: String,
1310 pub description: Option<String>,
1311 #[sqlx(rename = "created_at")]
1312 pub created_at: String,
1313 #[sqlx(rename = "updated_at")]
1314 pub updated_at: String,
1315 pub tags: String,
1316}
1317
1318#[derive(Debug)]
1320struct SyncSnapshotRow {
1321 id: String,
1322 endpoint: String,
1323 method: String,
1324 sync_cycle_id: String,
1325 timestamp: String,
1326 before_status_code: i32,
1327 after_status_code: i32,
1328 before_body: String,
1329 after_body: String,
1330 before_headers: String,
1331 after_headers: String,
1332 response_time_before_ms: Option<i64>,
1333 response_time_after_ms: Option<i64>,
1334 changes_summary: String,
1335}
1336
1337impl SyncSnapshotRow {
1338 fn to_snapshot(&self) -> Result<crate::sync_snapshots::SyncSnapshot> {
1339 use crate::sync_snapshots::{SnapshotData, SyncSnapshot};
1340 use std::collections::HashMap;
1341
1342 let timestamp = chrono::DateTime::parse_from_rfc3339(&self.timestamp)
1343 .map_err(|e| crate::RecorderError::InvalidFilter(format!("Invalid timestamp: {}", e)))?
1344 .with_timezone(&chrono::Utc);
1345
1346 let before_headers: HashMap<String, String> = serde_json::from_str(&self.before_headers)?;
1347 let after_headers: HashMap<String, String> = serde_json::from_str(&self.after_headers)?;
1348 let changes: crate::diff::ComparisonResult = serde_json::from_str(&self.changes_summary)?;
1349
1350 let before_body = base64::Engine::decode(
1351 &base64::engine::general_purpose::STANDARD,
1352 &self.before_body,
1353 )
1354 .map_err(|e| crate::RecorderError::InvalidFilter(format!("Invalid base64: {}", e)))?;
1355
1356 let after_body =
1357 base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &self.after_body)
1358 .map_err(|e| {
1359 crate::RecorderError::InvalidFilter(format!("Invalid base64: {}", e))
1360 })?;
1361
1362 let before_body_json = serde_json::from_slice(&before_body).ok();
1363 let after_body_json = serde_json::from_slice(&after_body).ok();
1364
1365 Ok(SyncSnapshot {
1366 id: self.id.clone(),
1367 endpoint: self.endpoint.clone(),
1368 method: self.method.clone(),
1369 sync_cycle_id: self.sync_cycle_id.clone(),
1370 timestamp,
1371 before: SnapshotData {
1372 status_code: self.before_status_code as u16,
1373 headers: before_headers,
1374 body: before_body,
1375 body_json: before_body_json,
1376 },
1377 after: SnapshotData {
1378 status_code: self.after_status_code as u16,
1379 headers: after_headers,
1380 body: after_body,
1381 body_json: after_body_json,
1382 },
1383 changes,
1384 response_time_before: self.response_time_before_ms.map(|v| v as u64),
1385 response_time_after: self.response_time_after_ms.map(|v| v as u64),
1386 })
1387 }
1388}
1389
1390impl<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow> for SyncSnapshotRow {
1391 fn from_row(row: &'r sqlx::sqlite::SqliteRow) -> sqlx::Result<Self> {
1392 use sqlx::Row;
1393
1394 Ok(SyncSnapshotRow {
1395 id: row.try_get("id")?,
1396 endpoint: row.try_get("endpoint")?,
1397 method: row.try_get("method")?,
1398 sync_cycle_id: row.try_get("sync_cycle_id")?,
1399 timestamp: row.try_get("timestamp")?,
1400 before_status_code: row.try_get("before_status_code")?,
1401 after_status_code: row.try_get("after_status_code")?,
1402 before_body: row.try_get("before_body")?,
1403 after_body: row.try_get("after_body")?,
1404 before_headers: row.try_get("before_headers")?,
1405 after_headers: row.try_get("after_headers")?,
1406 response_time_before_ms: row.try_get("response_time_before_ms")?,
1407 response_time_after_ms: row.try_get("response_time_after_ms")?,
1408 changes_summary: row.try_get("changes_summary")?,
1409 })
1410 }
1411}
1412
1413#[derive(Debug, Clone)]
1415pub struct DatabaseStats {
1416 pub total_requests: i64,
1417 pub total_responses: i64,
1418 pub total_size_bytes: i64,
1419}
1420
1421#[derive(Debug, Clone)]
1423pub struct DetailedStats {
1424 pub total_requests: i64,
1425 pub by_protocol: HashMap<String, i64>,
1426 pub by_status_code: HashMap<i32, i64>,
1427 pub avg_duration_ms: Option<f64>,
1428}
1429
1430impl<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow> for RecordedRequest {
1432 fn from_row(row: &'r sqlx::sqlite::SqliteRow) -> sqlx::Result<Self> {
1433 use sqlx::Row;
1434
1435 Ok(RecordedRequest {
1436 id: row.try_get("id")?,
1437 protocol: row.try_get("protocol")?,
1438 timestamp: row.try_get("timestamp")?,
1439 method: row.try_get("method")?,
1440 path: row.try_get("path")?,
1441 query_params: row.try_get("query_params")?,
1442 headers: row.try_get("headers")?,
1443 body: row.try_get("body")?,
1444 body_encoding: row.try_get("body_encoding")?,
1445 client_ip: row.try_get("client_ip")?,
1446 trace_id: row.try_get("trace_id")?,
1447 span_id: row.try_get("span_id")?,
1448 duration_ms: row.try_get("duration_ms")?,
1449 status_code: row.try_get("status_code")?,
1450 tags: row.try_get("tags")?,
1451 })
1452 }
1453}
1454
1455impl<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow> for RecordedResponse {
1457 fn from_row(row: &'r sqlx::sqlite::SqliteRow) -> sqlx::Result<Self> {
1458 use sqlx::Row;
1459
1460 Ok(RecordedResponse {
1461 request_id: row.try_get("request_id")?,
1462 status_code: row.try_get("status_code")?,
1463 headers: row.try_get("headers")?,
1464 body: row.try_get("body")?,
1465 body_encoding: row.try_get("body_encoding")?,
1466 size_bytes: row.try_get("size_bytes")?,
1467 timestamp: row.try_get("timestamp")?,
1468 })
1469 }
1470}
1471
1472#[cfg(test)]
1473mod tests {
1474 use super::*;
1475 use chrono::Utc;
1476
1477 #[tokio::test]
1478 async fn test_database_creation() {
1479 let db = RecorderDatabase::new_in_memory().await.unwrap();
1480 let stats = db.get_stats().await.unwrap();
1481 assert_eq!(stats.total_requests, 0);
1482 }
1483
1484 #[tokio::test]
1485 async fn test_insert_and_get_request() {
1486 let db = RecorderDatabase::new_in_memory().await.unwrap();
1487
1488 let request = RecordedRequest {
1489 id: "test-123".to_string(),
1490 protocol: Protocol::Http,
1491 timestamp: Utc::now(),
1492 method: "GET".to_string(),
1493 path: "/api/test".to_string(),
1494 query_params: None,
1495 headers: "{}".to_string(),
1496 body: None,
1497 body_encoding: "utf8".to_string(),
1498 client_ip: Some("127.0.0.1".to_string()),
1499 trace_id: None,
1500 span_id: None,
1501 duration_ms: Some(42),
1502 status_code: Some(200),
1503 tags: None,
1504 };
1505
1506 db.insert_request(&request).await.unwrap();
1507
1508 let retrieved = db.get_request("test-123").await.unwrap();
1509 assert!(retrieved.is_some());
1510 assert_eq!(retrieved.unwrap().path, "/api/test");
1511 }
1512}