Skip to main content

mockforge_recorder/
database.rs

1//! SQLite database for storing recorded requests and responses
2
3use crate::{models::*, Result};
4use sqlx::{sqlite::SqlitePool, Pool, Sqlite};
5use std::{collections::HashMap, path::Path};
6use tracing::{debug, info};
7
8/// SQLite database for recorder
9#[derive(Clone)]
10pub struct RecorderDatabase {
11    pool: Pool<Sqlite>,
12}
13
14impl RecorderDatabase {
15    /// Create a new database connection
16    pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
17        let db_url = format!("sqlite:{}?mode=rwc", path.as_ref().display());
18        let pool = SqlitePool::connect(&db_url).await?;
19
20        let db = Self { pool };
21        db.initialize_schema().await?;
22
23        info!("Recorder database initialized at {:?}", path.as_ref());
24        Ok(db)
25    }
26
27    /// Create an in-memory database (for testing)
28    pub async fn new_in_memory() -> Result<Self> {
29        let pool = SqlitePool::connect("sqlite::memory:").await?;
30
31        let db = Self { pool };
32        db.initialize_schema().await?;
33
34        debug!("In-memory recorder database initialized");
35        Ok(db)
36    }
37
38    /// Initialize database schema
39    async fn initialize_schema(&self) -> Result<()> {
40        // Create requests table
41        sqlx::query(
42            r#"
43            CREATE TABLE IF NOT EXISTS requests (
44                id TEXT PRIMARY KEY,
45                protocol TEXT NOT NULL,
46                timestamp TEXT NOT NULL,
47                method TEXT NOT NULL,
48                path TEXT NOT NULL,
49                query_params TEXT,
50                headers TEXT NOT NULL,
51                body TEXT,
52                body_encoding TEXT NOT NULL DEFAULT 'utf8',
53                client_ip TEXT,
54                trace_id TEXT,
55                span_id TEXT,
56                duration_ms INTEGER,
57                status_code INTEGER,
58                tags TEXT,
59                created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
60            )
61            "#,
62        )
63        .execute(&self.pool)
64        .await?;
65
66        // Create responses table
67        sqlx::query(
68            r#"
69            CREATE TABLE IF NOT EXISTS responses (
70                request_id TEXT PRIMARY KEY,
71                status_code INTEGER NOT NULL,
72                headers TEXT NOT NULL,
73                body TEXT,
74                body_encoding TEXT NOT NULL DEFAULT 'utf8',
75                size_bytes INTEGER NOT NULL,
76                timestamp TEXT NOT NULL,
77                created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
78                FOREIGN KEY (request_id) REFERENCES requests(id) ON DELETE CASCADE
79            )
80            "#,
81        )
82        .execute(&self.pool)
83        .await?;
84
85        // Create indexes for common queries
86        sqlx::query(
87            "CREATE INDEX IF NOT EXISTS idx_requests_timestamp ON requests(timestamp DESC)",
88        )
89        .execute(&self.pool)
90        .await?;
91
92        sqlx::query("CREATE INDEX IF NOT EXISTS idx_requests_protocol ON requests(protocol)")
93            .execute(&self.pool)
94            .await?;
95
96        sqlx::query("CREATE INDEX IF NOT EXISTS idx_requests_method ON requests(method)")
97            .execute(&self.pool)
98            .await?;
99
100        sqlx::query("CREATE INDEX IF NOT EXISTS idx_requests_path ON requests(path)")
101            .execute(&self.pool)
102            .await?;
103
104        sqlx::query("CREATE INDEX IF NOT EXISTS idx_requests_trace_id ON requests(trace_id)")
105            .execute(&self.pool)
106            .await?;
107
108        sqlx::query("CREATE INDEX IF NOT EXISTS idx_requests_status_code ON requests(status_code)")
109            .execute(&self.pool)
110            .await?;
111
112        // Create sync_snapshots table for Shadow Snapshot Mode
113        sqlx::query(
114            r#"
115            CREATE TABLE IF NOT EXISTS sync_snapshots (
116                id TEXT PRIMARY KEY,
117                endpoint TEXT NOT NULL,
118                method TEXT NOT NULL,
119                sync_cycle_id TEXT NOT NULL,
120                timestamp TEXT NOT NULL,
121                before_status_code INTEGER NOT NULL,
122                after_status_code INTEGER NOT NULL,
123                before_body TEXT NOT NULL,
124                after_body TEXT NOT NULL,
125                before_headers TEXT NOT NULL,
126                after_headers TEXT NOT NULL,
127                response_time_before_ms INTEGER,
128                response_time_after_ms INTEGER,
129                changes_summary TEXT NOT NULL,
130                created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
131            )
132            "#,
133        )
134        .execute(&self.pool)
135        .await?;
136
137        // Create indexes for sync_snapshots
138        sqlx::query(
139            "CREATE INDEX IF NOT EXISTS idx_sync_snapshots_endpoint ON sync_snapshots(endpoint, method, timestamp DESC)",
140        )
141        .execute(&self.pool)
142        .await?;
143
144        sqlx::query(
145            "CREATE INDEX IF NOT EXISTS idx_sync_snapshots_cycle ON sync_snapshots(sync_cycle_id)",
146        )
147        .execute(&self.pool)
148        .await?;
149
150        // Create behavioral_sequences table for Behavioral Cloning
151        sqlx::query(
152            r#"
153            CREATE TABLE IF NOT EXISTS behavioral_sequences (
154                id TEXT PRIMARY KEY,
155                name TEXT NOT NULL,
156                steps TEXT NOT NULL,
157                frequency REAL NOT NULL,
158                confidence REAL NOT NULL,
159                learned_from TEXT,
160                description TEXT,
161                tags TEXT,
162                created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
163            )
164            "#,
165        )
166        .execute(&self.pool)
167        .await?;
168
169        // Create endpoint_probabilities table for Behavioral Cloning
170        sqlx::query(
171            r#"
172            CREATE TABLE IF NOT EXISTS endpoint_probabilities (
173                endpoint TEXT NOT NULL,
174                method TEXT NOT NULL,
175                status_code_distribution TEXT NOT NULL,
176                latency_distribution TEXT NOT NULL,
177                error_patterns TEXT,
178                payload_variations TEXT,
179                sample_count INTEGER NOT NULL,
180                updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
181                PRIMARY KEY (endpoint, method)
182            )
183            "#,
184        )
185        .execute(&self.pool)
186        .await?;
187
188        // Create edge_case_patterns table for Behavioral Cloning
189        sqlx::query(
190            r#"
191            CREATE TABLE IF NOT EXISTS edge_case_patterns (
192                id TEXT PRIMARY KEY,
193                endpoint TEXT NOT NULL,
194                method TEXT NOT NULL,
195                pattern_type TEXT NOT NULL,
196                original_probability REAL NOT NULL,
197                amplified_probability REAL,
198                conditions TEXT,
199                sample_responses TEXT,
200                created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
201            )
202            "#,
203        )
204        .execute(&self.pool)
205        .await?;
206
207        // Create indexes for behavioral cloning tables
208        sqlx::query(
209            "CREATE INDEX IF NOT EXISTS idx_behavioral_sequences_name ON behavioral_sequences(name)",
210        )
211        .execute(&self.pool)
212        .await?;
213
214        sqlx::query(
215            "CREATE INDEX IF NOT EXISTS idx_endpoint_probabilities_endpoint ON endpoint_probabilities(endpoint, method)",
216        )
217        .execute(&self.pool)
218        .await?;
219
220        sqlx::query(
221            "CREATE INDEX IF NOT EXISTS idx_edge_case_patterns_endpoint ON edge_case_patterns(endpoint, method)",
222        )
223        .execute(&self.pool)
224        .await?;
225
226        // Create flows table for behavioral cloning v1
227        sqlx::query(
228            r#"
229            CREATE TABLE IF NOT EXISTS flows (
230                id TEXT PRIMARY KEY,
231                name TEXT,
232                description TEXT,
233                created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
234                tags TEXT,  -- JSON array
235                metadata TEXT  -- JSON object
236            )
237            "#,
238        )
239        .execute(&self.pool)
240        .await?;
241
242        // Create flow_steps table
243        sqlx::query(
244            r#"
245            CREATE TABLE IF NOT EXISTS flow_steps (
246                flow_id TEXT NOT NULL,
247                step_index INTEGER NOT NULL,
248                request_id TEXT NOT NULL,
249                step_label TEXT,  -- e.g., "login", "list", "checkout"
250                timing_ms INTEGER,  -- delay from previous step
251                FOREIGN KEY (flow_id) REFERENCES flows(id) ON DELETE CASCADE,
252                FOREIGN KEY (request_id) REFERENCES requests(id) ON DELETE CASCADE,
253                PRIMARY KEY (flow_id, step_index)
254            )
255            "#,
256        )
257        .execute(&self.pool)
258        .await?;
259
260        // Create scenarios table
261        sqlx::query(
262            r#"
263            CREATE TABLE IF NOT EXISTS scenarios (
264                id TEXT PRIMARY KEY,
265                name TEXT NOT NULL,
266                version TEXT NOT NULL,
267                description TEXT,
268                scenario_data TEXT NOT NULL,  -- JSON serialized BehavioralScenario
269                metadata TEXT,  -- JSON object
270                created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
271                updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
272                tags TEXT,  -- JSON array
273                UNIQUE(name, version)
274            )
275            "#,
276        )
277        .execute(&self.pool)
278        .await?;
279
280        // Create indexes for flows and scenarios
281        sqlx::query("CREATE INDEX IF NOT EXISTS idx_flows_created_at ON flows(created_at DESC)")
282            .execute(&self.pool)
283            .await?;
284
285        sqlx::query("CREATE INDEX IF NOT EXISTS idx_flow_steps_flow_id ON flow_steps(flow_id)")
286            .execute(&self.pool)
287            .await?;
288
289        sqlx::query("CREATE INDEX IF NOT EXISTS idx_scenarios_name ON scenarios(name, version)")
290            .execute(&self.pool)
291            .await?;
292
293        debug!("Database schema initialized");
294        Ok(())
295    }
296
297    /// Insert a new request
298    pub async fn insert_request(&self, request: &RecordedRequest) -> Result<()> {
299        sqlx::query(
300            r#"
301            INSERT INTO requests (
302                id, protocol, timestamp, method, path, query_params,
303                headers, body, body_encoding, client_ip, trace_id, span_id,
304                duration_ms, status_code, tags
305            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
306            "#,
307        )
308        .bind(&request.id)
309        .bind(request.protocol)
310        .bind(request.timestamp)
311        .bind(&request.method)
312        .bind(&request.path)
313        .bind(&request.query_params)
314        .bind(&request.headers)
315        .bind(&request.body)
316        .bind(&request.body_encoding)
317        .bind(&request.client_ip)
318        .bind(&request.trace_id)
319        .bind(&request.span_id)
320        .bind(request.duration_ms)
321        .bind(request.status_code)
322        .bind(&request.tags)
323        .execute(&self.pool)
324        .await?;
325
326        debug!("Recorded request: {} {} {}", request.protocol, request.method, request.path);
327        Ok(())
328    }
329
330    /// Insert a response
331    pub async fn insert_response(&self, response: &RecordedResponse) -> Result<()> {
332        sqlx::query(
333            r#"
334            INSERT INTO responses (
335                request_id, status_code, headers, body, body_encoding,
336                size_bytes, timestamp
337            ) VALUES (?, ?, ?, ?, ?, ?, ?)
338            "#,
339        )
340        .bind(&response.request_id)
341        .bind(response.status_code)
342        .bind(&response.headers)
343        .bind(&response.body)
344        .bind(&response.body_encoding)
345        .bind(response.size_bytes)
346        .bind(response.timestamp)
347        .execute(&self.pool)
348        .await?;
349
350        debug!("Recorded response for request: {}", response.request_id);
351        Ok(())
352    }
353
354    /// Get a request by ID
355    pub async fn get_request(&self, id: &str) -> Result<Option<RecordedRequest>> {
356        let request = sqlx::query_as::<_, RecordedRequest>(
357            r#"
358            SELECT id, protocol, timestamp, method, path, query_params,
359                   headers, body, body_encoding, client_ip, trace_id, span_id,
360                   duration_ms, status_code, tags
361            FROM requests WHERE id = ?
362            "#,
363        )
364        .bind(id)
365        .fetch_optional(&self.pool)
366        .await?;
367
368        Ok(request)
369    }
370
371    /// Get a response by request ID
372    pub async fn get_response(&self, request_id: &str) -> Result<Option<RecordedResponse>> {
373        let response = sqlx::query_as::<_, RecordedResponse>(
374            r#"
375            SELECT request_id, status_code, headers, body, body_encoding,
376                   size_bytes, timestamp
377            FROM responses WHERE request_id = ?
378            "#,
379        )
380        .bind(request_id)
381        .fetch_optional(&self.pool)
382        .await?;
383
384        Ok(response)
385    }
386
387    /// Get an exchange (request + response) by request ID
388    pub async fn get_exchange(&self, id: &str) -> Result<Option<RecordedExchange>> {
389        let request = self.get_request(id).await?;
390        if let Some(request) = request {
391            let response = self.get_response(id).await?;
392            Ok(Some(RecordedExchange { request, response }))
393        } else {
394            Ok(None)
395        }
396    }
397
398    /// Update an existing response
399    pub async fn update_response(
400        &self,
401        request_id: &str,
402        status_code: i32,
403        headers: &str,
404        body: &str,
405        size_bytes: i64,
406    ) -> Result<()> {
407        sqlx::query(
408            r#"
409            UPDATE responses
410            SET status_code = ?,
411                headers = ?,
412                body = ?,
413                body_encoding = 'base64',
414                size_bytes = ?,
415                timestamp = datetime('now')
416            WHERE request_id = ?
417            "#,
418        )
419        .bind(status_code)
420        .bind(headers)
421        .bind(body)
422        .bind(size_bytes)
423        .bind(request_id)
424        .execute(&self.pool)
425        .await?;
426
427        debug!("Updated response for request {}", request_id);
428        Ok(())
429    }
430
431    /// List recent requests
432    pub async fn list_recent(&self, limit: i32) -> Result<Vec<RecordedRequest>> {
433        let requests = sqlx::query_as::<_, RecordedRequest>(
434            r#"
435            SELECT id, protocol, timestamp, method, path, query_params,
436                   headers, body, body_encoding, client_ip, trace_id, span_id,
437                   duration_ms, status_code, tags
438            FROM requests
439            ORDER BY timestamp DESC
440            LIMIT ?
441            "#,
442        )
443        .bind(limit)
444        .fetch_all(&self.pool)
445        .await?;
446
447        Ok(requests)
448    }
449
450    /// Delete old requests
451    pub async fn delete_older_than(&self, days: i64) -> Result<u64> {
452        let result = sqlx::query(
453            r#"
454            DELETE FROM requests
455            WHERE timestamp < datetime('now', ? || ' days')
456            "#,
457        )
458        .bind(format!("-{}", days))
459        .execute(&self.pool)
460        .await?;
461
462        info!("Deleted {} old requests", result.rows_affected());
463        Ok(result.rows_affected())
464    }
465
466    /// Get database statistics
467    pub async fn get_stats(&self) -> Result<DatabaseStats> {
468        let total_requests: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM requests")
469            .fetch_one(&self.pool)
470            .await?;
471
472        let total_responses: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM responses")
473            .fetch_one(&self.pool)
474            .await?;
475
476        let total_size: i64 =
477            sqlx::query_scalar("SELECT COALESCE(SUM(size_bytes), 0) FROM responses")
478                .fetch_one(&self.pool)
479                .await?;
480
481        Ok(DatabaseStats {
482            total_requests,
483            total_responses,
484            total_size_bytes: total_size,
485        })
486    }
487
488    /// Get detailed statistics for API
489    pub async fn get_statistics(&self) -> Result<DetailedStats> {
490        let total_requests: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM requests")
491            .fetch_one(&self.pool)
492            .await?;
493
494        // Get count by protocol
495        let protocol_rows: Vec<(String, i64)> =
496            sqlx::query_as("SELECT protocol, COUNT(*) as count FROM requests GROUP BY protocol")
497                .fetch_all(&self.pool)
498                .await?;
499
500        let by_protocol: HashMap<String, i64> = protocol_rows.into_iter().collect();
501
502        // Get count by status code
503        let status_rows: Vec<(i32, i64)> = sqlx::query_as(
504            "SELECT status_code, COUNT(*) as count FROM requests WHERE status_code IS NOT NULL GROUP BY status_code"
505        )
506        .fetch_all(&self.pool)
507        .await?;
508
509        let by_status_code: HashMap<i32, i64> = status_rows.into_iter().collect();
510
511        // Get average duration
512        let avg_duration: Option<f64> = sqlx::query_scalar(
513            "SELECT AVG(duration_ms) FROM requests WHERE duration_ms IS NOT NULL",
514        )
515        .fetch_one(&self.pool)
516        .await?;
517
518        Ok(DetailedStats {
519            total_requests,
520            by_protocol,
521            by_status_code,
522            avg_duration_ms: avg_duration,
523        })
524    }
525
526    /// Clear all recordings
527    pub async fn clear_all(&self) -> Result<()> {
528        sqlx::query("DELETE FROM responses").execute(&self.pool).await?;
529        sqlx::query("DELETE FROM requests").execute(&self.pool).await?;
530        info!("Cleared all recordings");
531        Ok(())
532    }
533
534    /// Close the database connection
535    pub async fn close(self) {
536        self.pool.close().await;
537        debug!("Recorder database connection closed");
538    }
539
540    /// Insert a sync snapshot
541    pub async fn insert_sync_snapshot(
542        &self,
543        snapshot: &crate::sync_snapshots::SyncSnapshot,
544    ) -> Result<()> {
545        let before_headers_json = serde_json::to_string(&snapshot.before.headers)?;
546        let after_headers_json = serde_json::to_string(&snapshot.after.headers)?;
547        let before_body_encoded = base64::Engine::encode(
548            &base64::engine::general_purpose::STANDARD,
549            &snapshot.before.body,
550        );
551        let after_body_encoded = base64::Engine::encode(
552            &base64::engine::general_purpose::STANDARD,
553            &snapshot.after.body,
554        );
555        let changes_summary_json = serde_json::to_string(&snapshot.changes)?;
556
557        sqlx::query(
558            r#"
559            INSERT INTO sync_snapshots (
560                id, endpoint, method, sync_cycle_id, timestamp,
561                before_status_code, after_status_code,
562                before_body, after_body,
563                before_headers, after_headers,
564                response_time_before_ms, response_time_after_ms,
565                changes_summary
566            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
567            "#,
568        )
569        .bind(&snapshot.id)
570        .bind(&snapshot.endpoint)
571        .bind(&snapshot.method)
572        .bind(&snapshot.sync_cycle_id)
573        .bind(snapshot.timestamp.to_rfc3339())
574        .bind(snapshot.before.status_code as i32)
575        .bind(snapshot.after.status_code as i32)
576        .bind(&before_body_encoded)
577        .bind(&after_body_encoded)
578        .bind(&before_headers_json)
579        .bind(&after_headers_json)
580        .bind(snapshot.response_time_before.map(|v| v as i64))
581        .bind(snapshot.response_time_after.map(|v| v as i64))
582        .bind(&changes_summary_json)
583        .execute(&self.pool)
584        .await?;
585
586        debug!(
587            "Inserted sync snapshot: {} for {} {}",
588            snapshot.id, snapshot.method, snapshot.endpoint
589        );
590        Ok(())
591    }
592
593    /// Get snapshots for an endpoint
594    pub async fn get_snapshots_for_endpoint(
595        &self,
596        endpoint: &str,
597        method: Option<&str>,
598        limit: Option<i32>,
599    ) -> Result<Vec<crate::sync_snapshots::SyncSnapshot>> {
600        let limit = limit.unwrap_or(100);
601
602        // If endpoint is empty, get all snapshots
603        let query = if endpoint.is_empty() {
604            sqlx::query_as::<_, SyncSnapshotRow>(
605                r#"
606                SELECT id, endpoint, method, sync_cycle_id, timestamp,
607                       before_status_code, after_status_code,
608                       before_body, after_body,
609                       before_headers, after_headers,
610                       response_time_before_ms, response_time_after_ms,
611                       changes_summary
612                FROM sync_snapshots
613                ORDER BY timestamp DESC
614                LIMIT ?
615                "#,
616            )
617            .bind(limit)
618        } else if let Some(method) = method {
619            sqlx::query_as::<_, SyncSnapshotRow>(
620                r#"
621                SELECT id, endpoint, method, sync_cycle_id, timestamp,
622                       before_status_code, after_status_code,
623                       before_body, after_body,
624                       before_headers, after_headers,
625                       response_time_before_ms, response_time_after_ms,
626                       changes_summary
627                FROM sync_snapshots
628                WHERE endpoint = ? AND method = ?
629                ORDER BY timestamp DESC
630                LIMIT ?
631                "#,
632            )
633            .bind(endpoint)
634            .bind(method)
635            .bind(limit)
636        } else {
637            sqlx::query_as::<_, SyncSnapshotRow>(
638                r#"
639                SELECT id, endpoint, method, sync_cycle_id, timestamp,
640                       before_status_code, after_status_code,
641                       before_body, after_body,
642                       before_headers, after_headers,
643                       response_time_before_ms, response_time_after_ms,
644                       changes_summary
645                FROM sync_snapshots
646                WHERE endpoint = ?
647                ORDER BY timestamp DESC
648                LIMIT ?
649                "#,
650            )
651            .bind(endpoint)
652            .bind(limit)
653        };
654
655        let rows = query.fetch_all(&self.pool).await?;
656
657        let mut snapshots = Vec::new();
658        for row in rows {
659            snapshots.push(row.to_snapshot()?);
660        }
661
662        Ok(snapshots)
663    }
664
665    /// Get snapshots by sync cycle ID
666    pub async fn get_snapshots_by_cycle(
667        &self,
668        sync_cycle_id: &str,
669    ) -> Result<Vec<crate::sync_snapshots::SyncSnapshot>> {
670        let rows = sqlx::query_as::<_, SyncSnapshotRow>(
671            r#"
672            SELECT id, endpoint, method, sync_cycle_id, timestamp,
673                   before_status_code, after_status_code,
674                   before_body, after_body,
675                   before_headers, after_headers,
676                   response_time_before_ms, response_time_after_ms,
677                   changes_summary
678            FROM sync_snapshots
679            WHERE sync_cycle_id = ?
680            ORDER BY timestamp DESC
681            "#,
682        )
683        .bind(sync_cycle_id)
684        .fetch_all(&self.pool)
685        .await?;
686
687        let mut snapshots = Vec::new();
688        for row in rows {
689            snapshots.push(row.to_snapshot()?);
690        }
691
692        Ok(snapshots)
693    }
694
695    /// Delete old snapshots (retention policy)
696    pub async fn delete_old_snapshots(&self, keep_per_endpoint: i32) -> Result<u64> {
697        // This is a simplified retention policy - keep the most recent N snapshots per endpoint+method
698        // SQLite doesn't support window functions well, so we'll use a subquery approach
699        let result = sqlx::query(
700            r#"
701            DELETE FROM sync_snapshots
702            WHERE id NOT IN (
703                SELECT id FROM sync_snapshots
704                ORDER BY timestamp DESC
705                LIMIT (
706                    SELECT COUNT(*) FROM (
707                        SELECT DISTINCT endpoint || '|' || method FROM sync_snapshots
708                    )
709                ) * ?
710            )
711            "#,
712        )
713        .bind(keep_per_endpoint)
714        .execute(&self.pool)
715        .await?;
716
717        info!(
718            "Deleted {} old snapshots (kept {} per endpoint)",
719            result.rows_affected(),
720            keep_per_endpoint
721        );
722        Ok(result.rows_affected())
723    }
724
725    /// Insert a behavioral sequence
726    pub async fn insert_behavioral_sequence(
727        &self,
728        sequence: &mockforge_core::behavioral_cloning::BehavioralSequence,
729    ) -> Result<()> {
730        let steps_json = serde_json::to_string(&sequence.steps)?;
731        let learned_from_json = serde_json::to_string(&sequence.learned_from)?;
732        let tags_json = serde_json::to_string(&sequence.tags)?;
733
734        sqlx::query(
735            r#"
736            INSERT OR REPLACE INTO behavioral_sequences (
737                id, name, steps, frequency, confidence, learned_from, description, tags
738            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
739            "#,
740        )
741        .bind(&sequence.id)
742        .bind(&sequence.name)
743        .bind(&steps_json)
744        .bind(sequence.frequency)
745        .bind(sequence.confidence)
746        .bind(&learned_from_json)
747        .bind(&sequence.description)
748        .bind(&tags_json)
749        .execute(&self.pool)
750        .await?;
751
752        debug!("Inserted behavioral sequence: {}", sequence.id);
753        Ok(())
754    }
755
756    /// Get all behavioral sequences
757    pub async fn get_behavioral_sequences(
758        &self,
759    ) -> Result<Vec<mockforge_core::behavioral_cloning::BehavioralSequence>> {
760        let rows = sqlx::query(
761            r#"
762            SELECT id, name, steps, frequency, confidence, learned_from, description, tags
763            FROM behavioral_sequences
764            ORDER BY frequency DESC, confidence DESC
765            "#,
766        )
767        .fetch_all(&self.pool)
768        .await?;
769
770        let mut sequences = Vec::new();
771        for row in rows {
772            use sqlx::Row;
773            let steps_json: String = row.try_get("steps")?;
774            let learned_from_json: String = row.try_get("learned_from")?;
775            let tags_json: String = row.try_get("tags")?;
776
777            sequences.push(mockforge_core::behavioral_cloning::BehavioralSequence {
778                id: row.try_get("id")?,
779                name: row.try_get("name")?,
780                steps: serde_json::from_str(&steps_json)?,
781                frequency: row.try_get("frequency")?,
782                confidence: row.try_get("confidence")?,
783                learned_from: serde_json::from_str(&learned_from_json).unwrap_or_default(),
784                description: row.try_get("description")?,
785                tags: serde_json::from_str(&tags_json).unwrap_or_default(),
786            });
787        }
788
789        Ok(sequences)
790    }
791
792    /// Insert or update endpoint probability model
793    pub async fn insert_endpoint_probability_model(
794        &self,
795        model: &mockforge_core::behavioral_cloning::EndpointProbabilityModel,
796    ) -> Result<()> {
797        let status_code_dist_json = serde_json::to_string(&model.status_code_distribution)?;
798        let latency_dist_json = serde_json::to_string(&model.latency_distribution)?;
799        let error_patterns_json = serde_json::to_string(&model.error_patterns)?;
800        let payload_variations_json = serde_json::to_string(&model.payload_variations)?;
801
802        sqlx::query(
803            r#"
804            INSERT OR REPLACE INTO endpoint_probabilities (
805                endpoint, method, status_code_distribution, latency_distribution,
806                error_patterns, payload_variations, sample_count, updated_at
807            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
808            "#,
809        )
810        .bind(&model.endpoint)
811        .bind(&model.method)
812        .bind(&status_code_dist_json)
813        .bind(&latency_dist_json)
814        .bind(&error_patterns_json)
815        .bind(&payload_variations_json)
816        .bind(model.sample_count as i64)
817        .bind(model.updated_at)
818        .execute(&self.pool)
819        .await?;
820
821        debug!("Inserted probability model: {} {}", model.method, model.endpoint);
822        Ok(())
823    }
824
825    /// Get endpoint probability model
826    pub async fn get_endpoint_probability_model(
827        &self,
828        endpoint: &str,
829        method: &str,
830    ) -> Result<Option<mockforge_core::behavioral_cloning::EndpointProbabilityModel>> {
831        let row = sqlx::query(
832            r#"
833            SELECT endpoint, method, status_code_distribution, latency_distribution,
834                   error_patterns, payload_variations, sample_count, updated_at
835            FROM endpoint_probabilities
836            WHERE endpoint = ? AND method = ?
837            "#,
838        )
839        .bind(endpoint)
840        .bind(method)
841        .fetch_optional(&self.pool)
842        .await?;
843
844        if let Some(row) = row {
845            use sqlx::Row;
846            let status_code_dist_json: String = row.try_get("status_code_distribution")?;
847            let latency_dist_json: String = row.try_get("latency_distribution")?;
848            let error_patterns_json: String = row.try_get("error_patterns")?;
849            let payload_variations_json: String = row.try_get("payload_variations")?;
850
851            Ok(Some(mockforge_core::behavioral_cloning::EndpointProbabilityModel {
852                endpoint: row.try_get("endpoint")?,
853                method: row.try_get("method")?,
854                status_code_distribution: serde_json::from_str(&status_code_dist_json)?,
855                latency_distribution: serde_json::from_str(&latency_dist_json)?,
856                error_patterns: serde_json::from_str(&error_patterns_json).unwrap_or_default(),
857                payload_variations: serde_json::from_str(&payload_variations_json)
858                    .unwrap_or_default(),
859                sample_count: row.try_get::<i64, _>("sample_count")? as u64,
860                updated_at: row.try_get("updated_at")?,
861                original_error_probabilities: None,
862            }))
863        } else {
864            Ok(None)
865        }
866    }
867
868    /// Get all endpoint probability models
869    pub async fn get_all_endpoint_probability_models(
870        &self,
871    ) -> Result<Vec<mockforge_core::behavioral_cloning::EndpointProbabilityModel>> {
872        let rows = sqlx::query(
873            r#"
874            SELECT endpoint, method, status_code_distribution, latency_distribution,
875                   error_patterns, payload_variations, sample_count, updated_at
876            FROM endpoint_probabilities
877            ORDER BY sample_count DESC
878            "#,
879        )
880        .fetch_all(&self.pool)
881        .await?;
882
883        let mut models = Vec::new();
884        for row in rows {
885            use sqlx::Row;
886            let status_code_dist_json: String = row.try_get("status_code_distribution")?;
887            let latency_dist_json: String = row.try_get("latency_distribution")?;
888            let error_patterns_json: String = row.try_get("error_patterns")?;
889            let payload_variations_json: String = row.try_get("payload_variations")?;
890
891            models.push(mockforge_core::behavioral_cloning::EndpointProbabilityModel {
892                endpoint: row.try_get("endpoint")?,
893                method: row.try_get("method")?,
894                status_code_distribution: serde_json::from_str(&status_code_dist_json)?,
895                latency_distribution: serde_json::from_str(&latency_dist_json)?,
896                error_patterns: serde_json::from_str(&error_patterns_json).unwrap_or_default(),
897                payload_variations: serde_json::from_str(&payload_variations_json)
898                    .unwrap_or_default(),
899                sample_count: row.try_get::<i64, _>("sample_count")? as u64,
900                original_error_probabilities: None,
901                updated_at: row.try_get("updated_at")?,
902            });
903        }
904
905        Ok(models)
906    }
907
908    /// Get requests grouped by trace_id for sequence learning
909    pub async fn get_requests_by_trace(
910        &self,
911        min_requests_per_trace: Option<i32>,
912    ) -> Result<Vec<(String, Vec<RecordedRequest>)>> {
913        // Get all requests with trace_id, ordered by trace_id and timestamp
914        let requests = sqlx::query_as::<_, RecordedRequest>(
915            r#"
916            SELECT id, protocol, timestamp, method, path, query_params,
917                   headers, body, body_encoding, client_ip, trace_id, span_id,
918                   duration_ms, status_code, tags
919            FROM requests
920            WHERE trace_id IS NOT NULL AND trace_id != ''
921            ORDER BY trace_id, timestamp ASC
922            "#,
923        )
924        .fetch_all(&self.pool)
925        .await?;
926
927        // Group by trace_id
928        let mut grouped: HashMap<String, Vec<RecordedRequest>> = HashMap::new();
929        for request in requests {
930            if let Some(trace_id) = &request.trace_id {
931                grouped.entry(trace_id.clone()).or_default().push(request);
932            }
933        }
934
935        // Filter by minimum requests per trace if specified
936        let mut result: Vec<(String, Vec<RecordedRequest>)> = grouped
937            .into_iter()
938            .filter(|(_, requests)| {
939                min_requests_per_trace.is_none_or(|min| requests.len() >= min as usize)
940            })
941            .collect();
942
943        // Sort by trace_id for consistency
944        result.sort_by_key(|(trace_id, _)| trace_id.clone());
945
946        Ok(result)
947    }
948
949    /// Get requests and responses for a specific endpoint and method
950    ///
951    /// Returns a list of (request, response) pairs for building probability models.
952    pub async fn get_exchanges_for_endpoint(
953        &self,
954        endpoint: &str,
955        method: &str,
956        limit: Option<i32>,
957    ) -> Result<Vec<(RecordedRequest, Option<RecordedResponse>)>> {
958        let limit = limit.unwrap_or(10000);
959        let requests = sqlx::query_as::<_, RecordedRequest>(
960            r#"
961            SELECT id, protocol, timestamp, method, path, query_params,
962                   headers, body, body_encoding, client_ip, trace_id, span_id,
963                   duration_ms, status_code, tags
964            FROM requests
965            WHERE path = ? AND method = ?
966            ORDER BY timestamp DESC
967            LIMIT ?
968            "#,
969        )
970        .bind(endpoint)
971        .bind(method)
972        .bind(limit)
973        .fetch_all(&self.pool)
974        .await?;
975
976        let mut exchanges = Vec::new();
977        for request in requests {
978            let response = self.get_response(&request.id).await?;
979            exchanges.push((request, response));
980        }
981
982        Ok(exchanges)
983    }
984
985    /// Create a new flow
986    pub async fn create_flow(
987        &self,
988        flow_id: &str,
989        name: Option<&str>,
990        description: Option<&str>,
991        tags: &[String],
992    ) -> Result<()> {
993        let tags_json = serde_json::to_string(tags)?;
994        let metadata_json = serde_json::to_string(&HashMap::<String, serde_json::Value>::new())?;
995
996        sqlx::query(
997            r#"
998            INSERT INTO flows (id, name, description, created_at, tags, metadata)
999            VALUES (?, ?, ?, ?, ?, ?)
1000            "#,
1001        )
1002        .bind(flow_id)
1003        .bind(name)
1004        .bind(description)
1005        .bind(chrono::Utc::now().to_rfc3339())
1006        .bind(&tags_json)
1007        .bind(&metadata_json)
1008        .execute(&self.pool)
1009        .await?;
1010
1011        debug!("Created flow: {}", flow_id);
1012        Ok(())
1013    }
1014
1015    /// Add a step to a flow
1016    pub async fn add_flow_step(
1017        &self,
1018        flow_id: &str,
1019        request_id: &str,
1020        step_index: usize,
1021        step_label: Option<&str>,
1022        timing_ms: Option<u64>,
1023    ) -> Result<()> {
1024        sqlx::query(
1025            r#"
1026            INSERT INTO flow_steps (flow_id, step_index, request_id, step_label, timing_ms)
1027            VALUES (?, ?, ?, ?, ?)
1028            "#,
1029        )
1030        .bind(flow_id)
1031        .bind(step_index as i64)
1032        .bind(request_id)
1033        .bind(step_label)
1034        .bind(timing_ms.map(|t| t as i64))
1035        .execute(&self.pool)
1036        .await?;
1037
1038        debug!("Added step {} to flow {}", step_index, flow_id);
1039        Ok(())
1040    }
1041
1042    /// Get the number of steps in a flow
1043    pub async fn get_flow_step_count(&self, flow_id: &str) -> Result<usize> {
1044        let count: Option<i64> =
1045            sqlx::query_scalar("SELECT COUNT(*) FROM flow_steps WHERE flow_id = ?")
1046                .bind(flow_id)
1047                .fetch_optional(&self.pool)
1048                .await?;
1049
1050        Ok(count.unwrap_or(0) as usize)
1051    }
1052
1053    /// Get flow steps
1054    pub async fn get_flow_steps(&self, flow_id: &str) -> Result<Vec<FlowStepRow>> {
1055        let rows = sqlx::query_as::<_, FlowStepRow>(
1056            r#"
1057            SELECT request_id, step_index, step_label, timing_ms
1058            FROM flow_steps
1059            WHERE flow_id = ?
1060            ORDER BY step_index ASC
1061            "#,
1062        )
1063        .bind(flow_id)
1064        .fetch_all(&self.pool)
1065        .await?;
1066
1067        Ok(rows)
1068    }
1069
1070    /// Get flow metadata
1071    pub async fn get_flow_metadata(&self, flow_id: &str) -> Result<Option<FlowMetadataRow>> {
1072        let row = sqlx::query_as::<_, FlowMetadataRow>(
1073            r#"
1074            SELECT id, name, description, created_at, tags, metadata
1075            FROM flows
1076            WHERE id = ?
1077            "#,
1078        )
1079        .bind(flow_id)
1080        .fetch_optional(&self.pool)
1081        .await?;
1082
1083        Ok(row)
1084    }
1085
1086    /// List flows
1087    pub async fn list_flows(&self, limit: Option<i64>) -> Result<Vec<FlowMetadataRow>> {
1088        let limit = limit.unwrap_or(100);
1089        let rows = sqlx::query_as::<_, FlowMetadataRow>(
1090            r#"
1091            SELECT id, name, description, created_at, tags, metadata
1092            FROM flows
1093            ORDER BY created_at DESC
1094            LIMIT ?
1095            "#,
1096        )
1097        .bind(limit)
1098        .fetch_all(&self.pool)
1099        .await?;
1100
1101        Ok(rows)
1102    }
1103
1104    /// Update flow metadata
1105    pub async fn update_flow_metadata(
1106        &self,
1107        flow_id: &str,
1108        name: Option<&str>,
1109        description: Option<&str>,
1110        tags: Option<&[String]>,
1111    ) -> Result<()> {
1112        if let Some(tags) = tags {
1113            let tags_json = serde_json::to_string(tags)?;
1114            sqlx::query(
1115                r#"
1116                UPDATE flows
1117                SET name = COALESCE(?, name),
1118                    description = COALESCE(?, description),
1119                    tags = ?
1120                WHERE id = ?
1121                "#,
1122            )
1123            .bind(name)
1124            .bind(description)
1125            .bind(&tags_json)
1126            .bind(flow_id)
1127            .execute(&self.pool)
1128            .await?;
1129        } else {
1130            sqlx::query(
1131                r#"
1132                UPDATE flows
1133                SET name = COALESCE(?, name),
1134                    description = COALESCE(?, description)
1135                WHERE id = ?
1136                "#,
1137            )
1138            .bind(name)
1139            .bind(description)
1140            .bind(flow_id)
1141            .execute(&self.pool)
1142            .await?;
1143        }
1144
1145        info!("Updated flow metadata: {}", flow_id);
1146        Ok(())
1147    }
1148
1149    /// Delete a flow
1150    pub async fn delete_flow(&self, flow_id: &str) -> Result<()> {
1151        sqlx::query("DELETE FROM flows WHERE id = ?")
1152            .bind(flow_id)
1153            .execute(&self.pool)
1154            .await?;
1155
1156        info!("Deleted flow: {}", flow_id);
1157        Ok(())
1158    }
1159
1160    /// Store a behavioral scenario
1161    pub async fn store_scenario(
1162        &self,
1163        scenario: &crate::behavioral_cloning::BehavioralScenario,
1164        version: &str,
1165    ) -> Result<()> {
1166        let scenario_json = serde_json::to_string(scenario)?;
1167        let metadata_json = serde_json::to_string(&scenario.metadata)?;
1168        let tags_json = serde_json::to_string(&scenario.tags)?;
1169
1170        sqlx::query(
1171            r#"
1172            INSERT OR REPLACE INTO scenarios (
1173                id, name, version, description, scenario_data, metadata, updated_at, tags
1174            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
1175            "#,
1176        )
1177        .bind(&scenario.id)
1178        .bind(&scenario.name)
1179        .bind(version)
1180        .bind(&scenario.description)
1181        .bind(&scenario_json)
1182        .bind(&metadata_json)
1183        .bind(chrono::Utc::now().to_rfc3339())
1184        .bind(&tags_json)
1185        .execute(&self.pool)
1186        .await?;
1187
1188        debug!("Stored scenario: {} v{}", scenario.id, version);
1189        Ok(())
1190    }
1191
1192    /// Get a scenario by ID
1193    pub async fn get_scenario(
1194        &self,
1195        scenario_id: &str,
1196    ) -> Result<Option<crate::behavioral_cloning::BehavioralScenario>> {
1197        let row = sqlx::query(
1198            r#"
1199            SELECT scenario_data
1200            FROM scenarios
1201            WHERE id = ?
1202            ORDER BY version DESC
1203            LIMIT 1
1204            "#,
1205        )
1206        .bind(scenario_id)
1207        .fetch_optional(&self.pool)
1208        .await?;
1209
1210        if let Some(row) = row {
1211            use sqlx::Row;
1212            let scenario_json: String = row.try_get("scenario_data")?;
1213            let scenario: crate::behavioral_cloning::BehavioralScenario =
1214                serde_json::from_str(&scenario_json)?;
1215            Ok(Some(scenario))
1216        } else {
1217            Ok(None)
1218        }
1219    }
1220
1221    /// Get a scenario by name and version
1222    pub async fn get_scenario_by_name_version(
1223        &self,
1224        name: &str,
1225        version: &str,
1226    ) -> Result<Option<crate::behavioral_cloning::BehavioralScenario>> {
1227        let row = sqlx::query(
1228            r#"
1229            SELECT scenario_data
1230            FROM scenarios
1231            WHERE name = ? AND version = ?
1232            "#,
1233        )
1234        .bind(name)
1235        .bind(version)
1236        .fetch_optional(&self.pool)
1237        .await?;
1238
1239        if let Some(row) = row {
1240            use sqlx::Row;
1241            let scenario_json: String = row.try_get("scenario_data")?;
1242            let scenario: crate::behavioral_cloning::BehavioralScenario =
1243                serde_json::from_str(&scenario_json)?;
1244            Ok(Some(scenario))
1245        } else {
1246            Ok(None)
1247        }
1248    }
1249
1250    /// List all scenarios
1251    pub async fn list_scenarios(&self, limit: Option<i64>) -> Result<Vec<ScenarioMetadataRow>> {
1252        let limit = limit.unwrap_or(100);
1253        let rows = sqlx::query_as::<_, ScenarioMetadataRow>(
1254            r#"
1255            SELECT id, name, version, description, created_at, updated_at, tags
1256            FROM scenarios
1257            ORDER BY name, version DESC
1258            LIMIT ?
1259            "#,
1260        )
1261        .bind(limit)
1262        .fetch_all(&self.pool)
1263        .await?;
1264
1265        Ok(rows)
1266    }
1267
1268    /// Delete a scenario
1269    pub async fn delete_scenario(&self, scenario_id: &str) -> Result<()> {
1270        sqlx::query("DELETE FROM scenarios WHERE id = ?")
1271            .bind(scenario_id)
1272            .execute(&self.pool)
1273            .await?;
1274
1275        info!("Deleted scenario: {}", scenario_id);
1276        Ok(())
1277    }
1278}
1279
1280/// Flow step row from database
1281#[derive(Debug, Clone, sqlx::FromRow)]
1282pub struct FlowStepRow {
1283    pub request_id: String,
1284    #[sqlx(rename = "step_index")]
1285    pub step_index: i64,
1286    pub step_label: Option<String>,
1287    pub timing_ms: Option<i64>,
1288}
1289
1290/// Flow metadata row from database
1291#[derive(Debug, Clone, sqlx::FromRow)]
1292pub struct FlowMetadataRow {
1293    pub id: String,
1294    pub name: Option<String>,
1295    pub description: Option<String>,
1296    #[sqlx(rename = "created_at")]
1297    pub created_at: String,
1298    pub tags: String,
1299    pub metadata: String,
1300}
1301
1302/// Scenario metadata row from database
1303#[derive(Debug, Clone, sqlx::FromRow)]
1304pub struct ScenarioMetadataRow {
1305    pub id: String,
1306    pub name: String,
1307    pub version: String,
1308    pub description: Option<String>,
1309    #[sqlx(rename = "created_at")]
1310    pub created_at: String,
1311    #[sqlx(rename = "updated_at")]
1312    pub updated_at: String,
1313    pub tags: String,
1314}
1315
1316/// Internal row representation for sync snapshots
1317#[derive(Debug)]
1318struct SyncSnapshotRow {
1319    id: String,
1320    endpoint: String,
1321    method: String,
1322    sync_cycle_id: String,
1323    timestamp: String,
1324    before_status_code: i32,
1325    after_status_code: i32,
1326    before_body: String,
1327    after_body: String,
1328    before_headers: String,
1329    after_headers: String,
1330    response_time_before_ms: Option<i64>,
1331    response_time_after_ms: Option<i64>,
1332    changes_summary: String,
1333}
1334
1335impl SyncSnapshotRow {
1336    fn to_snapshot(&self) -> Result<crate::sync_snapshots::SyncSnapshot> {
1337        use crate::sync_snapshots::{SnapshotData, SyncSnapshot};
1338        use std::collections::HashMap;
1339
1340        let timestamp = chrono::DateTime::parse_from_rfc3339(&self.timestamp)
1341            .map_err(|e| crate::RecorderError::InvalidFilter(format!("Invalid timestamp: {}", e)))?
1342            .with_timezone(&chrono::Utc);
1343
1344        let before_headers: HashMap<String, String> = serde_json::from_str(&self.before_headers)?;
1345        let after_headers: HashMap<String, String> = serde_json::from_str(&self.after_headers)?;
1346        let changes: crate::diff::ComparisonResult = serde_json::from_str(&self.changes_summary)?;
1347
1348        let before_body = base64::Engine::decode(
1349            &base64::engine::general_purpose::STANDARD,
1350            &self.before_body,
1351        )
1352        .map_err(|e| crate::RecorderError::InvalidFilter(format!("Invalid base64: {}", e)))?;
1353
1354        let after_body =
1355            base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &self.after_body)
1356                .map_err(|e| {
1357                    crate::RecorderError::InvalidFilter(format!("Invalid base64: {}", e))
1358                })?;
1359
1360        let before_body_json = serde_json::from_slice(&before_body).ok();
1361        let after_body_json = serde_json::from_slice(&after_body).ok();
1362
1363        Ok(SyncSnapshot {
1364            id: self.id.clone(),
1365            endpoint: self.endpoint.clone(),
1366            method: self.method.clone(),
1367            sync_cycle_id: self.sync_cycle_id.clone(),
1368            timestamp,
1369            before: SnapshotData {
1370                status_code: self.before_status_code as u16,
1371                headers: before_headers,
1372                body: before_body,
1373                body_json: before_body_json,
1374            },
1375            after: SnapshotData {
1376                status_code: self.after_status_code as u16,
1377                headers: after_headers,
1378                body: after_body,
1379                body_json: after_body_json,
1380            },
1381            changes,
1382            response_time_before: self.response_time_before_ms.map(|v| v as u64),
1383            response_time_after: self.response_time_after_ms.map(|v| v as u64),
1384        })
1385    }
1386}
1387
1388impl<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow> for SyncSnapshotRow {
1389    fn from_row(row: &'r sqlx::sqlite::SqliteRow) -> sqlx::Result<Self> {
1390        use sqlx::Row;
1391
1392        Ok(SyncSnapshotRow {
1393            id: row.try_get("id")?,
1394            endpoint: row.try_get("endpoint")?,
1395            method: row.try_get("method")?,
1396            sync_cycle_id: row.try_get("sync_cycle_id")?,
1397            timestamp: row.try_get("timestamp")?,
1398            before_status_code: row.try_get("before_status_code")?,
1399            after_status_code: row.try_get("after_status_code")?,
1400            before_body: row.try_get("before_body")?,
1401            after_body: row.try_get("after_body")?,
1402            before_headers: row.try_get("before_headers")?,
1403            after_headers: row.try_get("after_headers")?,
1404            response_time_before_ms: row.try_get("response_time_before_ms")?,
1405            response_time_after_ms: row.try_get("response_time_after_ms")?,
1406            changes_summary: row.try_get("changes_summary")?,
1407        })
1408    }
1409}
1410
1411/// Database statistics
1412#[derive(Debug, Clone)]
1413pub struct DatabaseStats {
1414    pub total_requests: i64,
1415    pub total_responses: i64,
1416    pub total_size_bytes: i64,
1417}
1418
1419/// Detailed statistics for API
1420#[derive(Debug, Clone)]
1421pub struct DetailedStats {
1422    pub total_requests: i64,
1423    pub by_protocol: HashMap<String, i64>,
1424    pub by_status_code: HashMap<i32, i64>,
1425    pub avg_duration_ms: Option<f64>,
1426}
1427
1428// Implement FromRow for RecordedRequest
1429impl<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow> for RecordedRequest {
1430    fn from_row(row: &'r sqlx::sqlite::SqliteRow) -> sqlx::Result<Self> {
1431        use sqlx::Row;
1432
1433        Ok(RecordedRequest {
1434            id: row.try_get("id")?,
1435            protocol: row.try_get("protocol")?,
1436            timestamp: row.try_get("timestamp")?,
1437            method: row.try_get("method")?,
1438            path: row.try_get("path")?,
1439            query_params: row.try_get("query_params")?,
1440            headers: row.try_get("headers")?,
1441            body: row.try_get("body")?,
1442            body_encoding: row.try_get("body_encoding")?,
1443            client_ip: row.try_get("client_ip")?,
1444            trace_id: row.try_get("trace_id")?,
1445            span_id: row.try_get("span_id")?,
1446            duration_ms: row.try_get("duration_ms")?,
1447            status_code: row.try_get("status_code")?,
1448            tags: row.try_get("tags")?,
1449        })
1450    }
1451}
1452
1453// Implement FromRow for RecordedResponse
1454impl<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow> for RecordedResponse {
1455    fn from_row(row: &'r sqlx::sqlite::SqliteRow) -> sqlx::Result<Self> {
1456        use sqlx::Row;
1457
1458        Ok(RecordedResponse {
1459            request_id: row.try_get("request_id")?,
1460            status_code: row.try_get("status_code")?,
1461            headers: row.try_get("headers")?,
1462            body: row.try_get("body")?,
1463            body_encoding: row.try_get("body_encoding")?,
1464            size_bytes: row.try_get("size_bytes")?,
1465            timestamp: row.try_get("timestamp")?,
1466        })
1467    }
1468}
1469
1470#[cfg(test)]
1471mod tests {
1472    use super::*;
1473    use chrono::Utc;
1474
1475    #[tokio::test]
1476    async fn test_database_creation() {
1477        let db = RecorderDatabase::new_in_memory().await.unwrap();
1478        let stats = db.get_stats().await.unwrap();
1479        assert_eq!(stats.total_requests, 0);
1480    }
1481
1482    #[tokio::test]
1483    async fn test_insert_and_get_request() {
1484        let db = RecorderDatabase::new_in_memory().await.unwrap();
1485
1486        let request = RecordedRequest {
1487            id: "test-123".to_string(),
1488            protocol: Protocol::Http,
1489            timestamp: Utc::now(),
1490            method: "GET".to_string(),
1491            path: "/api/test".to_string(),
1492            query_params: None,
1493            headers: "{}".to_string(),
1494            body: None,
1495            body_encoding: "utf8".to_string(),
1496            client_ip: Some("127.0.0.1".to_string()),
1497            trace_id: None,
1498            span_id: None,
1499            duration_ms: Some(42),
1500            status_code: Some(200),
1501            tags: None,
1502        };
1503
1504        db.insert_request(&request).await.unwrap();
1505
1506        let retrieved = db.get_request("test-123").await.unwrap();
1507        assert!(retrieved.is_some());
1508        assert_eq!(retrieved.unwrap().path, "/api/test");
1509    }
1510}