mockforge_recorder/
database.rs

1//! SQLite database for storing recorded requests and responses
2
3use crate::{models::*, Result};
4use sqlx::{sqlite::SqlitePool, Pool, Sqlite};
5use std::{collections::HashMap, path::Path};
6use tracing::{debug, info};
7
8/// SQLite database for recorder
9#[derive(Clone)]
10pub struct RecorderDatabase {
11    pool: Pool<Sqlite>,
12}
13
14impl RecorderDatabase {
15    /// Create a new database connection
16    pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
17        let db_url = format!("sqlite:{}?mode=rwc", path.as_ref().display());
18        let pool = SqlitePool::connect(&db_url).await?;
19
20        let db = Self { pool };
21        db.initialize_schema().await?;
22
23        info!("Recorder database initialized at {:?}", path.as_ref());
24        Ok(db)
25    }
26
27    /// Create an in-memory database (for testing)
28    pub async fn new_in_memory() -> Result<Self> {
29        let pool = SqlitePool::connect("sqlite::memory:").await?;
30
31        let db = Self { pool };
32        db.initialize_schema().await?;
33
34        debug!("In-memory recorder database initialized");
35        Ok(db)
36    }
37
38    /// Initialize database schema
39    async fn initialize_schema(&self) -> Result<()> {
40        // Create requests table
41        sqlx::query(
42            r#"
43            CREATE TABLE IF NOT EXISTS requests (
44                id TEXT PRIMARY KEY,
45                protocol TEXT NOT NULL,
46                timestamp TEXT NOT NULL,
47                method TEXT NOT NULL,
48                path TEXT NOT NULL,
49                query_params TEXT,
50                headers TEXT NOT NULL,
51                body TEXT,
52                body_encoding TEXT NOT NULL DEFAULT 'utf8',
53                client_ip TEXT,
54                trace_id TEXT,
55                span_id TEXT,
56                duration_ms INTEGER,
57                status_code INTEGER,
58                tags TEXT,
59                created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
60            )
61            "#,
62        )
63        .execute(&self.pool)
64        .await?;
65
66        // Create responses table
67        sqlx::query(
68            r#"
69            CREATE TABLE IF NOT EXISTS responses (
70                request_id TEXT PRIMARY KEY,
71                status_code INTEGER NOT NULL,
72                headers TEXT NOT NULL,
73                body TEXT,
74                body_encoding TEXT NOT NULL DEFAULT 'utf8',
75                size_bytes INTEGER NOT NULL,
76                timestamp TEXT NOT NULL,
77                created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
78                FOREIGN KEY (request_id) REFERENCES requests(id) ON DELETE CASCADE
79            )
80            "#,
81        )
82        .execute(&self.pool)
83        .await?;
84
85        // Create indexes for common queries
86        sqlx::query(
87            "CREATE INDEX IF NOT EXISTS idx_requests_timestamp ON requests(timestamp DESC)",
88        )
89        .execute(&self.pool)
90        .await?;
91
92        sqlx::query("CREATE INDEX IF NOT EXISTS idx_requests_protocol ON requests(protocol)")
93            .execute(&self.pool)
94            .await?;
95
96        sqlx::query("CREATE INDEX IF NOT EXISTS idx_requests_method ON requests(method)")
97            .execute(&self.pool)
98            .await?;
99
100        sqlx::query("CREATE INDEX IF NOT EXISTS idx_requests_path ON requests(path)")
101            .execute(&self.pool)
102            .await?;
103
104        sqlx::query("CREATE INDEX IF NOT EXISTS idx_requests_trace_id ON requests(trace_id)")
105            .execute(&self.pool)
106            .await?;
107
108        sqlx::query("CREATE INDEX IF NOT EXISTS idx_requests_status_code ON requests(status_code)")
109            .execute(&self.pool)
110            .await?;
111
112        // Create sync_snapshots table for Shadow Snapshot Mode
113        sqlx::query(
114            r#"
115            CREATE TABLE IF NOT EXISTS sync_snapshots (
116                id TEXT PRIMARY KEY,
117                endpoint TEXT NOT NULL,
118                method TEXT NOT NULL,
119                sync_cycle_id TEXT NOT NULL,
120                timestamp TEXT NOT NULL,
121                before_status_code INTEGER NOT NULL,
122                after_status_code INTEGER NOT NULL,
123                before_body TEXT NOT NULL,
124                after_body TEXT NOT NULL,
125                before_headers TEXT NOT NULL,
126                after_headers TEXT NOT NULL,
127                response_time_before_ms INTEGER,
128                response_time_after_ms INTEGER,
129                changes_summary TEXT NOT NULL,
130                created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
131            )
132            "#,
133        )
134        .execute(&self.pool)
135        .await?;
136
137        // Create indexes for sync_snapshots
138        sqlx::query(
139            "CREATE INDEX IF NOT EXISTS idx_sync_snapshots_endpoint ON sync_snapshots(endpoint, method, timestamp DESC)",
140        )
141        .execute(&self.pool)
142        .await?;
143
144        sqlx::query(
145            "CREATE INDEX IF NOT EXISTS idx_sync_snapshots_cycle ON sync_snapshots(sync_cycle_id)",
146        )
147        .execute(&self.pool)
148        .await?;
149
150        // Create behavioral_sequences table for Behavioral Cloning
151        sqlx::query(
152            r#"
153            CREATE TABLE IF NOT EXISTS behavioral_sequences (
154                id TEXT PRIMARY KEY,
155                name TEXT NOT NULL,
156                steps TEXT NOT NULL,
157                frequency REAL NOT NULL,
158                confidence REAL NOT NULL,
159                learned_from TEXT,
160                description TEXT,
161                tags TEXT,
162                created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
163            )
164            "#,
165        )
166        .execute(&self.pool)
167        .await?;
168
169        // Create endpoint_probabilities table for Behavioral Cloning
170        sqlx::query(
171            r#"
172            CREATE TABLE IF NOT EXISTS endpoint_probabilities (
173                endpoint TEXT NOT NULL,
174                method TEXT NOT NULL,
175                status_code_distribution TEXT NOT NULL,
176                latency_distribution TEXT NOT NULL,
177                error_patterns TEXT,
178                payload_variations TEXT,
179                sample_count INTEGER NOT NULL,
180                updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
181                PRIMARY KEY (endpoint, method)
182            )
183            "#,
184        )
185        .execute(&self.pool)
186        .await?;
187
188        // Create edge_case_patterns table for Behavioral Cloning
189        sqlx::query(
190            r#"
191            CREATE TABLE IF NOT EXISTS edge_case_patterns (
192                id TEXT PRIMARY KEY,
193                endpoint TEXT NOT NULL,
194                method TEXT NOT NULL,
195                pattern_type TEXT NOT NULL,
196                original_probability REAL NOT NULL,
197                amplified_probability REAL,
198                conditions TEXT,
199                sample_responses TEXT,
200                created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
201            )
202            "#,
203        )
204        .execute(&self.pool)
205        .await?;
206
207        // Create indexes for behavioral cloning tables
208        sqlx::query(
209            "CREATE INDEX IF NOT EXISTS idx_behavioral_sequences_name ON behavioral_sequences(name)",
210        )
211        .execute(&self.pool)
212        .await?;
213
214        sqlx::query(
215            "CREATE INDEX IF NOT EXISTS idx_endpoint_probabilities_endpoint ON endpoint_probabilities(endpoint, method)",
216        )
217        .execute(&self.pool)
218        .await?;
219
220        sqlx::query(
221            "CREATE INDEX IF NOT EXISTS idx_edge_case_patterns_endpoint ON edge_case_patterns(endpoint, method)",
222        )
223        .execute(&self.pool)
224        .await?;
225
226        // Create flows table for behavioral cloning v1
227        sqlx::query(
228            r#"
229            CREATE TABLE IF NOT EXISTS flows (
230                id TEXT PRIMARY KEY,
231                name TEXT,
232                description TEXT,
233                created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
234                tags TEXT,  -- JSON array
235                metadata TEXT  -- JSON object
236            )
237            "#,
238        )
239        .execute(&self.pool)
240        .await?;
241
242        // Create flow_steps table
243        sqlx::query(
244            r#"
245            CREATE TABLE IF NOT EXISTS flow_steps (
246                flow_id TEXT NOT NULL,
247                step_index INTEGER NOT NULL,
248                request_id TEXT NOT NULL,
249                step_label TEXT,  -- e.g., "login", "list", "checkout"
250                timing_ms INTEGER,  -- delay from previous step
251                FOREIGN KEY (flow_id) REFERENCES flows(id) ON DELETE CASCADE,
252                FOREIGN KEY (request_id) REFERENCES requests(id) ON DELETE CASCADE,
253                PRIMARY KEY (flow_id, step_index)
254            )
255            "#,
256        )
257        .execute(&self.pool)
258        .await?;
259
260        // Create scenarios table
261        sqlx::query(
262            r#"
263            CREATE TABLE IF NOT EXISTS scenarios (
264                id TEXT PRIMARY KEY,
265                name TEXT NOT NULL,
266                version TEXT NOT NULL,
267                description TEXT,
268                scenario_data TEXT NOT NULL,  -- JSON serialized BehavioralScenario
269                metadata TEXT,  -- JSON object
270                created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
271                updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
272                tags TEXT,  -- JSON array
273                UNIQUE(name, version)
274            )
275            "#,
276        )
277        .execute(&self.pool)
278        .await?;
279
280        // Create indexes for flows and scenarios
281        sqlx::query("CREATE INDEX IF NOT EXISTS idx_flows_created_at ON flows(created_at DESC)")
282            .execute(&self.pool)
283            .await?;
284
285        sqlx::query("CREATE INDEX IF NOT EXISTS idx_flow_steps_flow_id ON flow_steps(flow_id)")
286            .execute(&self.pool)
287            .await?;
288
289        sqlx::query("CREATE INDEX IF NOT EXISTS idx_scenarios_name ON scenarios(name, version)")
290            .execute(&self.pool)
291            .await?;
292
293        debug!("Database schema initialized");
294        Ok(())
295    }
296
297    /// Insert a new request
298    pub async fn insert_request(&self, request: &RecordedRequest) -> Result<()> {
299        sqlx::query(
300            r#"
301            INSERT INTO requests (
302                id, protocol, timestamp, method, path, query_params,
303                headers, body, body_encoding, client_ip, trace_id, span_id,
304                duration_ms, status_code, tags
305            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
306            "#,
307        )
308        .bind(&request.id)
309        .bind(request.protocol)
310        .bind(request.timestamp)
311        .bind(&request.method)
312        .bind(&request.path)
313        .bind(&request.query_params)
314        .bind(&request.headers)
315        .bind(&request.body)
316        .bind(&request.body_encoding)
317        .bind(&request.client_ip)
318        .bind(&request.trace_id)
319        .bind(&request.span_id)
320        .bind(request.duration_ms)
321        .bind(request.status_code)
322        .bind(&request.tags)
323        .execute(&self.pool)
324        .await?;
325
326        debug!("Recorded request: {} {} {}", request.protocol, request.method, request.path);
327        Ok(())
328    }
329
330    /// Insert a response
331    pub async fn insert_response(&self, response: &RecordedResponse) -> Result<()> {
332        sqlx::query(
333            r#"
334            INSERT INTO responses (
335                request_id, status_code, headers, body, body_encoding,
336                size_bytes, timestamp
337            ) VALUES (?, ?, ?, ?, ?, ?, ?)
338            "#,
339        )
340        .bind(&response.request_id)
341        .bind(response.status_code)
342        .bind(&response.headers)
343        .bind(&response.body)
344        .bind(&response.body_encoding)
345        .bind(response.size_bytes)
346        .bind(response.timestamp)
347        .execute(&self.pool)
348        .await?;
349
350        debug!("Recorded response for request: {}", response.request_id);
351        Ok(())
352    }
353
354    /// Get a request by ID
355    pub async fn get_request(&self, id: &str) -> Result<Option<RecordedRequest>> {
356        let request = sqlx::query_as::<_, RecordedRequest>(
357            r#"
358            SELECT id, protocol, timestamp, method, path, query_params,
359                   headers, body, body_encoding, client_ip, trace_id, span_id,
360                   duration_ms, status_code, tags
361            FROM requests WHERE id = ?
362            "#,
363        )
364        .bind(id)
365        .fetch_optional(&self.pool)
366        .await?;
367
368        Ok(request)
369    }
370
371    /// Get a response by request ID
372    pub async fn get_response(&self, request_id: &str) -> Result<Option<RecordedResponse>> {
373        let response = sqlx::query_as::<_, RecordedResponse>(
374            r#"
375            SELECT request_id, status_code, headers, body, body_encoding,
376                   size_bytes, timestamp
377            FROM responses WHERE request_id = ?
378            "#,
379        )
380        .bind(request_id)
381        .fetch_optional(&self.pool)
382        .await?;
383
384        Ok(response)
385    }
386
387    /// Get an exchange (request + response) by request ID
388    pub async fn get_exchange(&self, id: &str) -> Result<Option<RecordedExchange>> {
389        let request = self.get_request(id).await?;
390        if let Some(request) = request {
391            let response = self.get_response(id).await?;
392            Ok(Some(RecordedExchange { request, response }))
393        } else {
394            Ok(None)
395        }
396    }
397
398    /// Update an existing response
399    pub async fn update_response(
400        &self,
401        request_id: &str,
402        status_code: i32,
403        headers: &str,
404        body: &str,
405        size_bytes: i64,
406    ) -> Result<()> {
407        sqlx::query(
408            r#"
409            UPDATE responses
410            SET status_code = ?,
411                headers = ?,
412                body = ?,
413                body_encoding = 'base64',
414                size_bytes = ?,
415                timestamp = datetime('now')
416            WHERE request_id = ?
417            "#,
418        )
419        .bind(status_code)
420        .bind(headers)
421        .bind(body)
422        .bind(size_bytes)
423        .bind(request_id)
424        .execute(&self.pool)
425        .await?;
426
427        debug!("Updated response for request {}", request_id);
428        Ok(())
429    }
430
431    /// List recent requests
432    pub async fn list_recent(&self, limit: i32) -> Result<Vec<RecordedRequest>> {
433        let requests = sqlx::query_as::<_, RecordedRequest>(
434            r#"
435            SELECT id, protocol, timestamp, method, path, query_params,
436                   headers, body, body_encoding, client_ip, trace_id, span_id,
437                   duration_ms, status_code, tags
438            FROM requests
439            ORDER BY timestamp DESC
440            LIMIT ?
441            "#,
442        )
443        .bind(limit)
444        .fetch_all(&self.pool)
445        .await?;
446
447        Ok(requests)
448    }
449
450    /// Delete old requests
451    pub async fn delete_older_than(&self, days: i64) -> Result<u64> {
452        let result = sqlx::query(
453            r#"
454            DELETE FROM requests
455            WHERE timestamp < datetime('now', ? || ' days')
456            "#,
457        )
458        .bind(format!("-{}", days))
459        .execute(&self.pool)
460        .await?;
461
462        info!("Deleted {} old requests", result.rows_affected());
463        Ok(result.rows_affected())
464    }
465
466    /// Get database statistics
467    pub async fn get_stats(&self) -> Result<DatabaseStats> {
468        let total_requests: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM requests")
469            .fetch_one(&self.pool)
470            .await?;
471
472        let total_responses: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM responses")
473            .fetch_one(&self.pool)
474            .await?;
475
476        let total_size: i64 =
477            sqlx::query_scalar("SELECT COALESCE(SUM(size_bytes), 0) FROM responses")
478                .fetch_one(&self.pool)
479                .await?;
480
481        Ok(DatabaseStats {
482            total_requests,
483            total_responses,
484            total_size_bytes: total_size,
485        })
486    }
487
488    /// Get detailed statistics for API
489    pub async fn get_statistics(&self) -> Result<DetailedStats> {
490        let total_requests: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM requests")
491            .fetch_one(&self.pool)
492            .await?;
493
494        // Get count by protocol
495        let protocol_rows: Vec<(String, i64)> =
496            sqlx::query_as("SELECT protocol, COUNT(*) as count FROM requests GROUP BY protocol")
497                .fetch_all(&self.pool)
498                .await?;
499
500        let by_protocol: HashMap<String, i64> = protocol_rows.into_iter().collect();
501
502        // Get count by status code
503        let status_rows: Vec<(i32, i64)> = sqlx::query_as(
504            "SELECT status_code, COUNT(*) as count FROM requests WHERE status_code IS NOT NULL GROUP BY status_code"
505        )
506        .fetch_all(&self.pool)
507        .await?;
508
509        let by_status_code: HashMap<i32, i64> = status_rows.into_iter().collect();
510
511        // Get average duration
512        let avg_duration: Option<f64> = sqlx::query_scalar(
513            "SELECT AVG(duration_ms) FROM requests WHERE duration_ms IS NOT NULL",
514        )
515        .fetch_one(&self.pool)
516        .await?;
517
518        Ok(DetailedStats {
519            total_requests,
520            by_protocol,
521            by_status_code,
522            avg_duration_ms: avg_duration,
523        })
524    }
525
526    /// Clear all recordings
527    pub async fn clear_all(&self) -> Result<()> {
528        sqlx::query("DELETE FROM responses").execute(&self.pool).await?;
529        sqlx::query("DELETE FROM requests").execute(&self.pool).await?;
530        info!("Cleared all recordings");
531        Ok(())
532    }
533
534    /// Close the database connection
535    pub async fn close(self) {
536        self.pool.close().await;
537        debug!("Recorder database connection closed");
538    }
539
540    /// Insert a sync snapshot
541    pub async fn insert_sync_snapshot(
542        &self,
543        snapshot: &crate::sync_snapshots::SyncSnapshot,
544    ) -> Result<()> {
545        let before_headers_json = serde_json::to_string(&snapshot.before.headers)?;
546        let after_headers_json = serde_json::to_string(&snapshot.after.headers)?;
547        let before_body_encoded = base64::Engine::encode(
548            &base64::engine::general_purpose::STANDARD,
549            &snapshot.before.body,
550        );
551        let after_body_encoded = base64::Engine::encode(
552            &base64::engine::general_purpose::STANDARD,
553            &snapshot.after.body,
554        );
555        let changes_summary_json = serde_json::to_string(&snapshot.changes)?;
556
557        sqlx::query(
558            r#"
559            INSERT INTO sync_snapshots (
560                id, endpoint, method, sync_cycle_id, timestamp,
561                before_status_code, after_status_code,
562                before_body, after_body,
563                before_headers, after_headers,
564                response_time_before_ms, response_time_after_ms,
565                changes_summary
566            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
567            "#,
568        )
569        .bind(&snapshot.id)
570        .bind(&snapshot.endpoint)
571        .bind(&snapshot.method)
572        .bind(&snapshot.sync_cycle_id)
573        .bind(snapshot.timestamp.to_rfc3339())
574        .bind(snapshot.before.status_code as i32)
575        .bind(snapshot.after.status_code as i32)
576        .bind(&before_body_encoded)
577        .bind(&after_body_encoded)
578        .bind(&before_headers_json)
579        .bind(&after_headers_json)
580        .bind(snapshot.response_time_before.map(|v| v as i64))
581        .bind(snapshot.response_time_after.map(|v| v as i64))
582        .bind(&changes_summary_json)
583        .execute(&self.pool)
584        .await?;
585
586        debug!(
587            "Inserted sync snapshot: {} for {} {}",
588            snapshot.id, snapshot.method, snapshot.endpoint
589        );
590        Ok(())
591    }
592
593    /// Get snapshots for an endpoint
594    pub async fn get_snapshots_for_endpoint(
595        &self,
596        endpoint: &str,
597        method: Option<&str>,
598        limit: Option<i32>,
599    ) -> Result<Vec<crate::sync_snapshots::SyncSnapshot>> {
600        let limit = limit.unwrap_or(100);
601
602        // If endpoint is empty, get all snapshots
603        let query = if endpoint.is_empty() {
604            sqlx::query_as::<_, SyncSnapshotRow>(
605                r#"
606                SELECT id, endpoint, method, sync_cycle_id, timestamp,
607                       before_status_code, after_status_code,
608                       before_body, after_body,
609                       before_headers, after_headers,
610                       response_time_before_ms, response_time_after_ms,
611                       changes_summary
612                FROM sync_snapshots
613                ORDER BY timestamp DESC
614                LIMIT ?
615                "#,
616            )
617            .bind(limit)
618        } else if let Some(method) = method {
619            sqlx::query_as::<_, SyncSnapshotRow>(
620                r#"
621                SELECT id, endpoint, method, sync_cycle_id, timestamp,
622                       before_status_code, after_status_code,
623                       before_body, after_body,
624                       before_headers, after_headers,
625                       response_time_before_ms, response_time_after_ms,
626                       changes_summary
627                FROM sync_snapshots
628                WHERE endpoint = ? AND method = ?
629                ORDER BY timestamp DESC
630                LIMIT ?
631                "#,
632            )
633            .bind(endpoint)
634            .bind(method)
635            .bind(limit)
636        } else {
637            sqlx::query_as::<_, SyncSnapshotRow>(
638                r#"
639                SELECT id, endpoint, method, sync_cycle_id, timestamp,
640                       before_status_code, after_status_code,
641                       before_body, after_body,
642                       before_headers, after_headers,
643                       response_time_before_ms, response_time_after_ms,
644                       changes_summary
645                FROM sync_snapshots
646                WHERE endpoint = ?
647                ORDER BY timestamp DESC
648                LIMIT ?
649                "#,
650            )
651            .bind(endpoint)
652            .bind(limit)
653        };
654
655        let rows = query.fetch_all(&self.pool).await?;
656
657        let mut snapshots = Vec::new();
658        for row in rows {
659            snapshots.push(row.to_snapshot()?);
660        }
661
662        Ok(snapshots)
663    }
664
665    /// Get snapshots by sync cycle ID
666    pub async fn get_snapshots_by_cycle(
667        &self,
668        sync_cycle_id: &str,
669    ) -> Result<Vec<crate::sync_snapshots::SyncSnapshot>> {
670        let rows = sqlx::query_as::<_, SyncSnapshotRow>(
671            r#"
672            SELECT id, endpoint, method, sync_cycle_id, timestamp,
673                   before_status_code, after_status_code,
674                   before_body, after_body,
675                   before_headers, after_headers,
676                   response_time_before_ms, response_time_after_ms,
677                   changes_summary
678            FROM sync_snapshots
679            WHERE sync_cycle_id = ?
680            ORDER BY timestamp DESC
681            "#,
682        )
683        .bind(sync_cycle_id)
684        .fetch_all(&self.pool)
685        .await?;
686
687        let mut snapshots = Vec::new();
688        for row in rows {
689            snapshots.push(row.to_snapshot()?);
690        }
691
692        Ok(snapshots)
693    }
694
695    /// Delete old snapshots (retention policy)
696    pub async fn delete_old_snapshots(&self, keep_per_endpoint: i32) -> Result<u64> {
697        // This is a simplified retention policy - keep the most recent N snapshots per endpoint+method
698        // SQLite doesn't support window functions well, so we'll use a subquery approach
699        let result = sqlx::query(
700            r#"
701            DELETE FROM sync_snapshots
702            WHERE id NOT IN (
703                SELECT id FROM sync_snapshots
704                ORDER BY timestamp DESC
705                LIMIT (
706                    SELECT COUNT(*) FROM (
707                        SELECT DISTINCT endpoint || '|' || method FROM sync_snapshots
708                    )
709                ) * ?
710            )
711            "#,
712        )
713        .bind(keep_per_endpoint)
714        .execute(&self.pool)
715        .await?;
716
717        info!(
718            "Deleted {} old snapshots (kept {} per endpoint)",
719            result.rows_affected(),
720            keep_per_endpoint
721        );
722        Ok(result.rows_affected())
723    }
724
725    /// Insert a behavioral sequence
726    pub async fn insert_behavioral_sequence(
727        &self,
728        sequence: &mockforge_core::behavioral_cloning::BehavioralSequence,
729    ) -> Result<()> {
730        let steps_json = serde_json::to_string(&sequence.steps)?;
731        let learned_from_json = serde_json::to_string(&sequence.learned_from)?;
732        let tags_json = serde_json::to_string(&sequence.tags)?;
733
734        sqlx::query(
735            r#"
736            INSERT OR REPLACE INTO behavioral_sequences (
737                id, name, steps, frequency, confidence, learned_from, description, tags
738            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
739            "#,
740        )
741        .bind(&sequence.id)
742        .bind(&sequence.name)
743        .bind(&steps_json)
744        .bind(sequence.frequency)
745        .bind(sequence.confidence)
746        .bind(&learned_from_json)
747        .bind(&sequence.description)
748        .bind(&tags_json)
749        .execute(&self.pool)
750        .await?;
751
752        debug!("Inserted behavioral sequence: {}", sequence.id);
753        Ok(())
754    }
755
756    /// Get all behavioral sequences
757    pub async fn get_behavioral_sequences(
758        &self,
759    ) -> Result<Vec<mockforge_core::behavioral_cloning::BehavioralSequence>> {
760        let rows = sqlx::query(
761            r#"
762            SELECT id, name, steps, frequency, confidence, learned_from, description, tags
763            FROM behavioral_sequences
764            ORDER BY frequency DESC, confidence DESC
765            "#,
766        )
767        .fetch_all(&self.pool)
768        .await?;
769
770        let mut sequences = Vec::new();
771        for row in rows {
772            use sqlx::Row;
773            let steps_json: String = row.try_get("steps")?;
774            let learned_from_json: String = row.try_get("learned_from")?;
775            let tags_json: String = row.try_get("tags")?;
776
777            sequences.push(mockforge_core::behavioral_cloning::BehavioralSequence {
778                id: row.try_get("id")?,
779                name: row.try_get("name")?,
780                steps: serde_json::from_str(&steps_json)?,
781                frequency: row.try_get("frequency")?,
782                confidence: row.try_get("confidence")?,
783                learned_from: serde_json::from_str(&learned_from_json).unwrap_or_default(),
784                description: row.try_get("description")?,
785                tags: serde_json::from_str(&tags_json).unwrap_or_default(),
786            });
787        }
788
789        Ok(sequences)
790    }
791
792    /// Insert or update endpoint probability model
793    pub async fn insert_endpoint_probability_model(
794        &self,
795        model: &mockforge_core::behavioral_cloning::EndpointProbabilityModel,
796    ) -> Result<()> {
797        let status_code_dist_json = serde_json::to_string(&model.status_code_distribution)?;
798        let latency_dist_json = serde_json::to_string(&model.latency_distribution)?;
799        let error_patterns_json = serde_json::to_string(&model.error_patterns)?;
800        let payload_variations_json = serde_json::to_string(&model.payload_variations)?;
801
802        sqlx::query(
803            r#"
804            INSERT OR REPLACE INTO endpoint_probabilities (
805                endpoint, method, status_code_distribution, latency_distribution,
806                error_patterns, payload_variations, sample_count, updated_at
807            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
808            "#,
809        )
810        .bind(&model.endpoint)
811        .bind(&model.method)
812        .bind(&status_code_dist_json)
813        .bind(&latency_dist_json)
814        .bind(&error_patterns_json)
815        .bind(&payload_variations_json)
816        .bind(model.sample_count as i64)
817        .bind(model.updated_at)
818        .execute(&self.pool)
819        .await?;
820
821        debug!("Inserted probability model: {} {}", model.method, model.endpoint);
822        Ok(())
823    }
824
825    /// Get endpoint probability model
826    pub async fn get_endpoint_probability_model(
827        &self,
828        endpoint: &str,
829        method: &str,
830    ) -> Result<Option<mockforge_core::behavioral_cloning::EndpointProbabilityModel>> {
831        let row = sqlx::query(
832            r#"
833            SELECT endpoint, method, status_code_distribution, latency_distribution,
834                   error_patterns, payload_variations, sample_count, updated_at
835            FROM endpoint_probabilities
836            WHERE endpoint = ? AND method = ?
837            "#,
838        )
839        .bind(endpoint)
840        .bind(method)
841        .fetch_optional(&self.pool)
842        .await?;
843
844        if let Some(row) = row {
845            use sqlx::Row;
846            let status_code_dist_json: String = row.try_get("status_code_distribution")?;
847            let latency_dist_json: String = row.try_get("latency_distribution")?;
848            let error_patterns_json: String = row.try_get("error_patterns")?;
849            let payload_variations_json: String = row.try_get("payload_variations")?;
850
851            Ok(Some(mockforge_core::behavioral_cloning::EndpointProbabilityModel {
852                endpoint: row.try_get("endpoint")?,
853                method: row.try_get("method")?,
854                status_code_distribution: serde_json::from_str(&status_code_dist_json)?,
855                latency_distribution: serde_json::from_str(&latency_dist_json)?,
856                error_patterns: serde_json::from_str(&error_patterns_json).unwrap_or_default(),
857                payload_variations: serde_json::from_str(&payload_variations_json)
858                    .unwrap_or_default(),
859                sample_count: row.try_get::<i64, _>("sample_count")? as u64,
860                updated_at: row.try_get("updated_at")?,
861                original_error_probabilities: None,
862            }))
863        } else {
864            Ok(None)
865        }
866    }
867
868    /// Get all endpoint probability models
869    pub async fn get_all_endpoint_probability_models(
870        &self,
871    ) -> Result<Vec<mockforge_core::behavioral_cloning::EndpointProbabilityModel>> {
872        let rows = sqlx::query(
873            r#"
874            SELECT endpoint, method, status_code_distribution, latency_distribution,
875                   error_patterns, payload_variations, sample_count, updated_at
876            FROM endpoint_probabilities
877            ORDER BY sample_count DESC
878            "#,
879        )
880        .fetch_all(&self.pool)
881        .await?;
882
883        let mut models = Vec::new();
884        for row in rows {
885            use sqlx::Row;
886            let status_code_dist_json: String = row.try_get("status_code_distribution")?;
887            let latency_dist_json: String = row.try_get("latency_distribution")?;
888            let error_patterns_json: String = row.try_get("error_patterns")?;
889            let payload_variations_json: String = row.try_get("payload_variations")?;
890
891            models.push(mockforge_core::behavioral_cloning::EndpointProbabilityModel {
892                endpoint: row.try_get("endpoint")?,
893                method: row.try_get("method")?,
894                status_code_distribution: serde_json::from_str(&status_code_dist_json)?,
895                latency_distribution: serde_json::from_str(&latency_dist_json)?,
896                error_patterns: serde_json::from_str(&error_patterns_json).unwrap_or_default(),
897                payload_variations: serde_json::from_str(&payload_variations_json)
898                    .unwrap_or_default(),
899                sample_count: row.try_get::<i64, _>("sample_count")? as u64,
900                original_error_probabilities: None,
901                updated_at: row.try_get("updated_at")?,
902            });
903        }
904
905        Ok(models)
906    }
907
908    /// Get requests grouped by trace_id for sequence learning
909    pub async fn get_requests_by_trace(
910        &self,
911        min_requests_per_trace: Option<i32>,
912    ) -> Result<Vec<(String, Vec<RecordedRequest>)>> {
913        // Get all requests with trace_id, ordered by trace_id and timestamp
914        let requests = sqlx::query_as::<_, RecordedRequest>(
915            r#"
916            SELECT id, protocol, timestamp, method, path, query_params,
917                   headers, body, body_encoding, client_ip, trace_id, span_id,
918                   duration_ms, status_code, tags
919            FROM requests
920            WHERE trace_id IS NOT NULL AND trace_id != ''
921            ORDER BY trace_id, timestamp ASC
922            "#,
923        )
924        .fetch_all(&self.pool)
925        .await?;
926
927        // Group by trace_id
928        let mut grouped: std::collections::HashMap<String, Vec<RecordedRequest>> =
929            std::collections::HashMap::new();
930        for request in requests {
931            if let Some(trace_id) = &request.trace_id {
932                grouped.entry(trace_id.clone()).or_insert_with(Vec::new).push(request);
933            }
934        }
935
936        // Filter by minimum requests per trace if specified
937        let mut result: Vec<(String, Vec<RecordedRequest>)> = grouped
938            .into_iter()
939            .filter(|(_, requests)| {
940                min_requests_per_trace.map_or(true, |min| requests.len() >= min as usize)
941            })
942            .collect();
943
944        // Sort by trace_id for consistency
945        result.sort_by_key(|(trace_id, _)| trace_id.clone());
946
947        Ok(result)
948    }
949
950    /// Get requests and responses for a specific endpoint and method
951    ///
952    /// Returns a list of (request, response) pairs for building probability models.
953    pub async fn get_exchanges_for_endpoint(
954        &self,
955        endpoint: &str,
956        method: &str,
957        limit: Option<i32>,
958    ) -> Result<Vec<(RecordedRequest, Option<RecordedResponse>)>> {
959        let limit = limit.unwrap_or(10000);
960        let requests = sqlx::query_as::<_, RecordedRequest>(
961            r#"
962            SELECT id, protocol, timestamp, method, path, query_params,
963                   headers, body, body_encoding, client_ip, trace_id, span_id,
964                   duration_ms, status_code, tags
965            FROM requests
966            WHERE path = ? AND method = ?
967            ORDER BY timestamp DESC
968            LIMIT ?
969            "#,
970        )
971        .bind(endpoint)
972        .bind(method)
973        .bind(limit)
974        .fetch_all(&self.pool)
975        .await?;
976
977        let mut exchanges = Vec::new();
978        for request in requests {
979            let response = self.get_response(&request.id).await?;
980            exchanges.push((request, response));
981        }
982
983        Ok(exchanges)
984    }
985
986    /// Create a new flow
987    pub async fn create_flow(
988        &self,
989        flow_id: &str,
990        name: Option<&str>,
991        description: Option<&str>,
992        tags: &[String],
993    ) -> Result<()> {
994        let tags_json = serde_json::to_string(tags)?;
995        let metadata_json =
996            serde_json::to_string(&std::collections::HashMap::<String, serde_json::Value>::new())?;
997
998        sqlx::query(
999            r#"
1000            INSERT INTO flows (id, name, description, created_at, tags, metadata)
1001            VALUES (?, ?, ?, ?, ?, ?)
1002            "#,
1003        )
1004        .bind(flow_id)
1005        .bind(name)
1006        .bind(description)
1007        .bind(chrono::Utc::now().to_rfc3339())
1008        .bind(&tags_json)
1009        .bind(&metadata_json)
1010        .execute(&self.pool)
1011        .await?;
1012
1013        debug!("Created flow: {}", flow_id);
1014        Ok(())
1015    }
1016
1017    /// Add a step to a flow
1018    pub async fn add_flow_step(
1019        &self,
1020        flow_id: &str,
1021        request_id: &str,
1022        step_index: usize,
1023        step_label: Option<&str>,
1024        timing_ms: Option<u64>,
1025    ) -> Result<()> {
1026        sqlx::query(
1027            r#"
1028            INSERT INTO flow_steps (flow_id, step_index, request_id, step_label, timing_ms)
1029            VALUES (?, ?, ?, ?, ?)
1030            "#,
1031        )
1032        .bind(flow_id)
1033        .bind(step_index as i64)
1034        .bind(request_id)
1035        .bind(step_label)
1036        .bind(timing_ms.map(|t| t as i64))
1037        .execute(&self.pool)
1038        .await?;
1039
1040        debug!("Added step {} to flow {}", step_index, flow_id);
1041        Ok(())
1042    }
1043
1044    /// Get the number of steps in a flow
1045    pub async fn get_flow_step_count(&self, flow_id: &str) -> Result<usize> {
1046        let count: Option<i64> =
1047            sqlx::query_scalar("SELECT COUNT(*) FROM flow_steps WHERE flow_id = ?")
1048                .bind(flow_id)
1049                .fetch_optional(&self.pool)
1050                .await?;
1051
1052        Ok(count.unwrap_or(0) as usize)
1053    }
1054
1055    /// Get flow steps
1056    pub async fn get_flow_steps(&self, flow_id: &str) -> Result<Vec<FlowStepRow>> {
1057        let rows = sqlx::query_as::<_, FlowStepRow>(
1058            r#"
1059            SELECT request_id, step_index, step_label, timing_ms
1060            FROM flow_steps
1061            WHERE flow_id = ?
1062            ORDER BY step_index ASC
1063            "#,
1064        )
1065        .bind(flow_id)
1066        .fetch_all(&self.pool)
1067        .await?;
1068
1069        Ok(rows)
1070    }
1071
1072    /// Get flow metadata
1073    pub async fn get_flow_metadata(&self, flow_id: &str) -> Result<Option<FlowMetadataRow>> {
1074        let row = sqlx::query_as::<_, FlowMetadataRow>(
1075            r#"
1076            SELECT id, name, description, created_at, tags, metadata
1077            FROM flows
1078            WHERE id = ?
1079            "#,
1080        )
1081        .bind(flow_id)
1082        .fetch_optional(&self.pool)
1083        .await?;
1084
1085        Ok(row)
1086    }
1087
1088    /// List flows
1089    pub async fn list_flows(&self, limit: Option<i64>) -> Result<Vec<FlowMetadataRow>> {
1090        let limit = limit.unwrap_or(100);
1091        let rows = sqlx::query_as::<_, FlowMetadataRow>(
1092            r#"
1093            SELECT id, name, description, created_at, tags, metadata
1094            FROM flows
1095            ORDER BY created_at DESC
1096            LIMIT ?
1097            "#,
1098        )
1099        .bind(limit)
1100        .fetch_all(&self.pool)
1101        .await?;
1102
1103        Ok(rows)
1104    }
1105
1106    /// Update flow metadata
1107    pub async fn update_flow_metadata(
1108        &self,
1109        flow_id: &str,
1110        name: Option<&str>,
1111        description: Option<&str>,
1112        tags: Option<&[String]>,
1113    ) -> Result<()> {
1114        if let Some(tags) = tags {
1115            let tags_json = serde_json::to_string(tags)?;
1116            sqlx::query(
1117                r#"
1118                UPDATE flows
1119                SET name = COALESCE(?, name),
1120                    description = COALESCE(?, description),
1121                    tags = ?
1122                WHERE id = ?
1123                "#,
1124            )
1125            .bind(name)
1126            .bind(description)
1127            .bind(&tags_json)
1128            .bind(flow_id)
1129            .execute(&self.pool)
1130            .await?;
1131        } else {
1132            sqlx::query(
1133                r#"
1134                UPDATE flows
1135                SET name = COALESCE(?, name),
1136                    description = COALESCE(?, description)
1137                WHERE id = ?
1138                "#,
1139            )
1140            .bind(name)
1141            .bind(description)
1142            .bind(flow_id)
1143            .execute(&self.pool)
1144            .await?;
1145        }
1146
1147        info!("Updated flow metadata: {}", flow_id);
1148        Ok(())
1149    }
1150
1151    /// Delete a flow
1152    pub async fn delete_flow(&self, flow_id: &str) -> Result<()> {
1153        sqlx::query("DELETE FROM flows WHERE id = ?")
1154            .bind(flow_id)
1155            .execute(&self.pool)
1156            .await?;
1157
1158        info!("Deleted flow: {}", flow_id);
1159        Ok(())
1160    }
1161
1162    /// Store a behavioral scenario
1163    pub async fn store_scenario(
1164        &self,
1165        scenario: &crate::behavioral_cloning::BehavioralScenario,
1166        version: &str,
1167    ) -> Result<()> {
1168        let scenario_json = serde_json::to_string(scenario)?;
1169        let metadata_json = serde_json::to_string(&scenario.metadata)?;
1170        let tags_json = serde_json::to_string(&scenario.tags)?;
1171
1172        sqlx::query(
1173            r#"
1174            INSERT OR REPLACE INTO scenarios (
1175                id, name, version, description, scenario_data, metadata, updated_at, tags
1176            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
1177            "#,
1178        )
1179        .bind(&scenario.id)
1180        .bind(&scenario.name)
1181        .bind(version)
1182        .bind(&scenario.description)
1183        .bind(&scenario_json)
1184        .bind(&metadata_json)
1185        .bind(chrono::Utc::now().to_rfc3339())
1186        .bind(&tags_json)
1187        .execute(&self.pool)
1188        .await?;
1189
1190        debug!("Stored scenario: {} v{}", scenario.id, version);
1191        Ok(())
1192    }
1193
1194    /// Get a scenario by ID
1195    pub async fn get_scenario(
1196        &self,
1197        scenario_id: &str,
1198    ) -> Result<Option<crate::behavioral_cloning::BehavioralScenario>> {
1199        let row = sqlx::query(
1200            r#"
1201            SELECT scenario_data
1202            FROM scenarios
1203            WHERE id = ?
1204            ORDER BY version DESC
1205            LIMIT 1
1206            "#,
1207        )
1208        .bind(scenario_id)
1209        .fetch_optional(&self.pool)
1210        .await?;
1211
1212        if let Some(row) = row {
1213            use sqlx::Row;
1214            let scenario_json: String = row.try_get("scenario_data")?;
1215            let scenario: crate::behavioral_cloning::BehavioralScenario =
1216                serde_json::from_str(&scenario_json)?;
1217            Ok(Some(scenario))
1218        } else {
1219            Ok(None)
1220        }
1221    }
1222
1223    /// Get a scenario by name and version
1224    pub async fn get_scenario_by_name_version(
1225        &self,
1226        name: &str,
1227        version: &str,
1228    ) -> Result<Option<crate::behavioral_cloning::BehavioralScenario>> {
1229        let row = sqlx::query(
1230            r#"
1231            SELECT scenario_data
1232            FROM scenarios
1233            WHERE name = ? AND version = ?
1234            "#,
1235        )
1236        .bind(name)
1237        .bind(version)
1238        .fetch_optional(&self.pool)
1239        .await?;
1240
1241        if let Some(row) = row {
1242            use sqlx::Row;
1243            let scenario_json: String = row.try_get("scenario_data")?;
1244            let scenario: crate::behavioral_cloning::BehavioralScenario =
1245                serde_json::from_str(&scenario_json)?;
1246            Ok(Some(scenario))
1247        } else {
1248            Ok(None)
1249        }
1250    }
1251
1252    /// List all scenarios
1253    pub async fn list_scenarios(&self, limit: Option<i64>) -> Result<Vec<ScenarioMetadataRow>> {
1254        let limit = limit.unwrap_or(100);
1255        let rows = sqlx::query_as::<_, ScenarioMetadataRow>(
1256            r#"
1257            SELECT id, name, version, description, created_at, updated_at, tags
1258            FROM scenarios
1259            ORDER BY name, version DESC
1260            LIMIT ?
1261            "#,
1262        )
1263        .bind(limit)
1264        .fetch_all(&self.pool)
1265        .await?;
1266
1267        Ok(rows)
1268    }
1269
1270    /// Delete a scenario
1271    pub async fn delete_scenario(&self, scenario_id: &str) -> Result<()> {
1272        sqlx::query("DELETE FROM scenarios WHERE id = ?")
1273            .bind(scenario_id)
1274            .execute(&self.pool)
1275            .await?;
1276
1277        info!("Deleted scenario: {}", scenario_id);
1278        Ok(())
1279    }
1280}
1281
1282/// Flow step row from database
1283#[derive(Debug, Clone, sqlx::FromRow)]
1284pub struct FlowStepRow {
1285    pub request_id: String,
1286    #[sqlx(rename = "step_index")]
1287    pub step_index: i64,
1288    pub step_label: Option<String>,
1289    pub timing_ms: Option<i64>,
1290}
1291
1292/// Flow metadata row from database
1293#[derive(Debug, Clone, sqlx::FromRow)]
1294pub struct FlowMetadataRow {
1295    pub id: String,
1296    pub name: Option<String>,
1297    pub description: Option<String>,
1298    #[sqlx(rename = "created_at")]
1299    pub created_at: String,
1300    pub tags: String,
1301    pub metadata: String,
1302}
1303
1304/// Scenario metadata row from database
1305#[derive(Debug, Clone, sqlx::FromRow)]
1306pub struct ScenarioMetadataRow {
1307    pub id: String,
1308    pub name: String,
1309    pub version: String,
1310    pub description: Option<String>,
1311    #[sqlx(rename = "created_at")]
1312    pub created_at: String,
1313    #[sqlx(rename = "updated_at")]
1314    pub updated_at: String,
1315    pub tags: String,
1316}
1317
1318/// Internal row representation for sync snapshots
1319#[derive(Debug)]
1320struct SyncSnapshotRow {
1321    id: String,
1322    endpoint: String,
1323    method: String,
1324    sync_cycle_id: String,
1325    timestamp: String,
1326    before_status_code: i32,
1327    after_status_code: i32,
1328    before_body: String,
1329    after_body: String,
1330    before_headers: String,
1331    after_headers: String,
1332    response_time_before_ms: Option<i64>,
1333    response_time_after_ms: Option<i64>,
1334    changes_summary: String,
1335}
1336
1337impl SyncSnapshotRow {
1338    fn to_snapshot(&self) -> Result<crate::sync_snapshots::SyncSnapshot> {
1339        use crate::sync_snapshots::{SnapshotData, SyncSnapshot};
1340        use std::collections::HashMap;
1341
1342        let timestamp = chrono::DateTime::parse_from_rfc3339(&self.timestamp)
1343            .map_err(|e| crate::RecorderError::InvalidFilter(format!("Invalid timestamp: {}", e)))?
1344            .with_timezone(&chrono::Utc);
1345
1346        let before_headers: HashMap<String, String> = serde_json::from_str(&self.before_headers)?;
1347        let after_headers: HashMap<String, String> = serde_json::from_str(&self.after_headers)?;
1348        let changes: crate::diff::ComparisonResult = serde_json::from_str(&self.changes_summary)?;
1349
1350        let before_body = base64::Engine::decode(
1351            &base64::engine::general_purpose::STANDARD,
1352            &self.before_body,
1353        )
1354        .map_err(|e| crate::RecorderError::InvalidFilter(format!("Invalid base64: {}", e)))?;
1355
1356        let after_body =
1357            base64::Engine::decode(&base64::engine::general_purpose::STANDARD, &self.after_body)
1358                .map_err(|e| {
1359                    crate::RecorderError::InvalidFilter(format!("Invalid base64: {}", e))
1360                })?;
1361
1362        let before_body_json = serde_json::from_slice(&before_body).ok();
1363        let after_body_json = serde_json::from_slice(&after_body).ok();
1364
1365        Ok(SyncSnapshot {
1366            id: self.id.clone(),
1367            endpoint: self.endpoint.clone(),
1368            method: self.method.clone(),
1369            sync_cycle_id: self.sync_cycle_id.clone(),
1370            timestamp,
1371            before: SnapshotData {
1372                status_code: self.before_status_code as u16,
1373                headers: before_headers,
1374                body: before_body,
1375                body_json: before_body_json,
1376            },
1377            after: SnapshotData {
1378                status_code: self.after_status_code as u16,
1379                headers: after_headers,
1380                body: after_body,
1381                body_json: after_body_json,
1382            },
1383            changes,
1384            response_time_before: self.response_time_before_ms.map(|v| v as u64),
1385            response_time_after: self.response_time_after_ms.map(|v| v as u64),
1386        })
1387    }
1388}
1389
1390impl<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow> for SyncSnapshotRow {
1391    fn from_row(row: &'r sqlx::sqlite::SqliteRow) -> sqlx::Result<Self> {
1392        use sqlx::Row;
1393
1394        Ok(SyncSnapshotRow {
1395            id: row.try_get("id")?,
1396            endpoint: row.try_get("endpoint")?,
1397            method: row.try_get("method")?,
1398            sync_cycle_id: row.try_get("sync_cycle_id")?,
1399            timestamp: row.try_get("timestamp")?,
1400            before_status_code: row.try_get("before_status_code")?,
1401            after_status_code: row.try_get("after_status_code")?,
1402            before_body: row.try_get("before_body")?,
1403            after_body: row.try_get("after_body")?,
1404            before_headers: row.try_get("before_headers")?,
1405            after_headers: row.try_get("after_headers")?,
1406            response_time_before_ms: row.try_get("response_time_before_ms")?,
1407            response_time_after_ms: row.try_get("response_time_after_ms")?,
1408            changes_summary: row.try_get("changes_summary")?,
1409        })
1410    }
1411}
1412
1413/// Database statistics
1414#[derive(Debug, Clone)]
1415pub struct DatabaseStats {
1416    pub total_requests: i64,
1417    pub total_responses: i64,
1418    pub total_size_bytes: i64,
1419}
1420
1421/// Detailed statistics for API
1422#[derive(Debug, Clone)]
1423pub struct DetailedStats {
1424    pub total_requests: i64,
1425    pub by_protocol: HashMap<String, i64>,
1426    pub by_status_code: HashMap<i32, i64>,
1427    pub avg_duration_ms: Option<f64>,
1428}
1429
1430// Implement FromRow for RecordedRequest
1431impl<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow> for RecordedRequest {
1432    fn from_row(row: &'r sqlx::sqlite::SqliteRow) -> sqlx::Result<Self> {
1433        use sqlx::Row;
1434
1435        Ok(RecordedRequest {
1436            id: row.try_get("id")?,
1437            protocol: row.try_get("protocol")?,
1438            timestamp: row.try_get("timestamp")?,
1439            method: row.try_get("method")?,
1440            path: row.try_get("path")?,
1441            query_params: row.try_get("query_params")?,
1442            headers: row.try_get("headers")?,
1443            body: row.try_get("body")?,
1444            body_encoding: row.try_get("body_encoding")?,
1445            client_ip: row.try_get("client_ip")?,
1446            trace_id: row.try_get("trace_id")?,
1447            span_id: row.try_get("span_id")?,
1448            duration_ms: row.try_get("duration_ms")?,
1449            status_code: row.try_get("status_code")?,
1450            tags: row.try_get("tags")?,
1451        })
1452    }
1453}
1454
1455// Implement FromRow for RecordedResponse
1456impl<'r> sqlx::FromRow<'r, sqlx::sqlite::SqliteRow> for RecordedResponse {
1457    fn from_row(row: &'r sqlx::sqlite::SqliteRow) -> sqlx::Result<Self> {
1458        use sqlx::Row;
1459
1460        Ok(RecordedResponse {
1461            request_id: row.try_get("request_id")?,
1462            status_code: row.try_get("status_code")?,
1463            headers: row.try_get("headers")?,
1464            body: row.try_get("body")?,
1465            body_encoding: row.try_get("body_encoding")?,
1466            size_bytes: row.try_get("size_bytes")?,
1467            timestamp: row.try_get("timestamp")?,
1468        })
1469    }
1470}
1471
1472#[cfg(test)]
1473mod tests {
1474    use super::*;
1475    use chrono::Utc;
1476
1477    #[tokio::test]
1478    async fn test_database_creation() {
1479        let db = RecorderDatabase::new_in_memory().await.unwrap();
1480        let stats = db.get_stats().await.unwrap();
1481        assert_eq!(stats.total_requests, 0);
1482    }
1483
1484    #[tokio::test]
1485    async fn test_insert_and_get_request() {
1486        let db = RecorderDatabase::new_in_memory().await.unwrap();
1487
1488        let request = RecordedRequest {
1489            id: "test-123".to_string(),
1490            protocol: Protocol::Http,
1491            timestamp: Utc::now(),
1492            method: "GET".to_string(),
1493            path: "/api/test".to_string(),
1494            query_params: None,
1495            headers: "{}".to_string(),
1496            body: None,
1497            body_encoding: "utf8".to_string(),
1498            client_ip: Some("127.0.0.1".to_string()),
1499            trace_id: None,
1500            span_id: None,
1501            duration_ms: Some(42),
1502            status_code: Some(200),
1503            tags: None,
1504        };
1505
1506        db.insert_request(&request).await.unwrap();
1507
1508        let retrieved = db.get_request("test-123").await.unwrap();
1509        assert!(retrieved.is_some());
1510        assert_eq!(retrieved.unwrap().path, "/api/test");
1511    }
1512}