codex_memory/memory/
repository.rs

1use super::error::{MemoryError, Result};
2use super::event_triggers::EventTriggeredScoringEngine;
3use super::math_engine::constants;
4use super::models::*;
5use crate::config::Config;
6use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
7use base64::Engine;
8use chrono::{DateTime, Utc};
9use pgvector::Vector;
10use sqlx::postgres::types::PgInterval;
11use sqlx::{PgPool, Postgres, Row, Transaction};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Instant;
15use tracing::{debug, info, warn};
16use uuid::Uuid;
17
18pub struct MemoryRepository {
19    pool: PgPool,
20    trigger_engine: Option<Arc<EventTriggeredScoringEngine>>,
21    config: Option<Config>,
22}
23
24/// Safe query builder to prevent SQL injection vulnerabilities
25#[derive(Debug, Clone)]
26pub struct SafeQueryBuilder {
27    query_parts: Vec<String>,
28    parameters: Vec<QueryParameter>,
29    bind_index: usize,
30}
31
32#[derive(Debug, Clone)]
33enum QueryParameter {
34    Text(String),
35    Integer(i64),
36    Float(f64),
37    DateTime(DateTime<Utc>),
38    Tier(MemoryTier),
39    Uuid(Uuid),
40    Vector(Vector),
41}
42
43impl SafeQueryBuilder {
44    pub fn new(base_query: &str) -> Self {
45        Self {
46            query_parts: vec![base_query.to_string()],
47            parameters: Vec::new(),
48            bind_index: 1,
49        }
50    }
51
52    /// Add a parameterized condition safely
53    pub fn add_condition(&mut self, condition: &str) -> &mut Self {
54        self.query_parts.push(condition.to_string());
55        self
56    }
57
58    /// Add a parameterized tier filter
59    pub fn add_tier_filter(&mut self, tier: &MemoryTier) -> &mut Self {
60        let condition = format!("AND m.tier = ${}", self.bind_index);
61        self.query_parts.push(condition);
62        self.parameters.push(QueryParameter::Tier(*tier));
63        self.bind_index += 1;
64        self
65    }
66
67    /// Add a parameterized date range filter
68    pub fn add_date_range(
69        &mut self,
70        start: Option<&DateTime<Utc>>,
71        end: Option<&DateTime<Utc>>,
72    ) -> &mut Self {
73        if let Some(start_date) = start {
74            let condition = format!("AND m.created_at >= ${}", self.bind_index);
75            self.query_parts.push(condition);
76            self.parameters.push(QueryParameter::DateTime(*start_date));
77            self.bind_index += 1;
78        }
79        if let Some(end_date) = end {
80            let condition = format!("AND m.created_at <= ${}", self.bind_index);
81            self.query_parts.push(condition);
82            self.parameters.push(QueryParameter::DateTime(*end_date));
83            self.bind_index += 1;
84        }
85        self
86    }
87
88    /// Add a parameterized importance range filter
89    pub fn add_importance_range(&mut self, min: Option<f64>, max: Option<f64>) -> &mut Self {
90        if let Some(min_score) = min {
91            let condition = format!("AND m.importance_score >= ${}", self.bind_index);
92            self.query_parts.push(condition);
93            self.parameters.push(QueryParameter::Float(min_score));
94            self.bind_index += 1;
95        }
96        if let Some(max_score) = max {
97            let condition = format!("AND m.importance_score <= ${}", self.bind_index);
98            self.query_parts.push(condition);
99            self.parameters.push(QueryParameter::Float(max_score));
100            self.bind_index += 1;
101        }
102        self
103    }
104
105    /// Add a parameterized similarity threshold
106    pub fn add_similarity_threshold(&mut self, threshold: f64) -> &mut Self {
107        let condition = format!("AND (1 - (m.embedding <=> $1)) >= ${}", self.bind_index);
108        self.query_parts.push(condition);
109        self.parameters.push(QueryParameter::Float(threshold));
110        self.bind_index += 1;
111        self
112    }
113
114    /// Add consolidation strength range filter
115    pub fn add_consolidation_strength_range(
116        &mut self,
117        min: Option<f64>,
118        max: Option<f64>,
119    ) -> &mut Self {
120        if let Some(min_strength) = min {
121            let condition = format!("AND consolidation_strength >= ${}", self.bind_index);
122            self.query_parts.push(condition);
123            self.parameters.push(QueryParameter::Float(min_strength));
124            self.bind_index += 1;
125        }
126        if let Some(max_strength) = max {
127            let condition = format!("AND consolidation_strength <= ${}", self.bind_index);
128            self.query_parts.push(condition);
129            self.parameters.push(QueryParameter::Float(max_strength));
130            self.bind_index += 1;
131        }
132        self
133    }
134
135    /// Add recall probability range filter
136    pub fn add_recall_probability_range(
137        &mut self,
138        min: Option<f64>,
139        max: Option<f64>,
140    ) -> &mut Self {
141        if let Some(min_recall) = min {
142            let condition = format!("AND recall_probability >= ${}", self.bind_index);
143            self.query_parts.push(condition);
144            self.parameters.push(QueryParameter::Float(min_recall));
145            self.bind_index += 1;
146        }
147        if let Some(max_recall) = max {
148            let condition = format!("AND recall_probability <= ${}", self.bind_index);
149            self.query_parts.push(condition);
150            self.parameters.push(QueryParameter::Float(max_recall));
151            self.bind_index += 1;
152        }
153        self
154    }
155
156    /// Add condition to exclude frozen tier if needed
157    pub fn add_exclude_frozen(&mut self, exclude: bool) -> &mut Self {
158        if exclude {
159            self.query_parts.push("AND tier != 'frozen'".to_string());
160        }
161        self
162    }
163
164    /// Add a safe time interval condition for last access
165    pub fn add_last_access_interval(&mut self, hours: f64) -> &mut Self {
166        let condition = format!(
167            "AND (last_accessed_at IS NULL OR last_accessed_at < NOW() - INTERVAL '${} hours')",
168            self.bind_index
169        );
170        self.query_parts.push(condition);
171        self.parameters
172            .push(QueryParameter::Text(hours.to_string()));
173        self.bind_index += 1;
174        self
175    }
176
177    /// Add recall probability threshold condition
178    pub fn add_recall_threshold_condition(&mut self, threshold: f64) -> &mut Self {
179        // Store the threshold parameter for use in ORDER BY clause
180        self.parameters.push(QueryParameter::Float(threshold));
181        self.bind_index += 1;
182        self
183    }
184
185    /// Add a limit and offset with validation
186    pub fn add_pagination(&mut self, limit: usize, offset: usize) -> Result<&mut Self> {
187        // Input validation: reasonable limits to prevent resource exhaustion
188        if limit > 10000 {
189            return Err(MemoryError::InvalidRequest {
190                message: "Limit cannot exceed 10000 for performance reasons".to_string(),
191            });
192        }
193        if offset > 1000000 {
194            return Err(MemoryError::InvalidRequest {
195                message: "Offset cannot exceed 1000000 for performance reasons".to_string(),
196            });
197        }
198
199        let condition = format!("LIMIT ${} OFFSET ${}", self.bind_index, self.bind_index + 1);
200        self.query_parts.push(condition);
201        self.parameters.push(QueryParameter::Integer(limit as i64));
202        self.parameters.push(QueryParameter::Integer(offset as i64));
203        self.bind_index += 2;
204        Ok(self)
205    }
206
207    /// Build the final query string
208    pub fn build_query(&self) -> String {
209        self.query_parts.join(" ")
210    }
211
212    /// Apply parameters to a sqlx query
213    pub fn bind_parameters<'a>(
214        &'a self,
215        mut query: sqlx::query::Query<'a, Postgres, sqlx::postgres::PgArguments>,
216    ) -> sqlx::query::Query<'a, Postgres, sqlx::postgres::PgArguments> {
217        for param in &self.parameters {
218            query = match param {
219                QueryParameter::Text(s) => query.bind(s),
220                QueryParameter::Integer(i) => query.bind(*i),
221                QueryParameter::Float(f) => query.bind(*f),
222                QueryParameter::DateTime(dt) => query.bind(*dt),
223                QueryParameter::Tier(tier) => query.bind(tier),
224                QueryParameter::Uuid(uuid) => query.bind(*uuid),
225                QueryParameter::Vector(vec) => query.bind(vec),
226            };
227        }
228        query
229    }
230
231    /// Apply parameters to a sqlx query_as
232    pub fn bind_parameters_as<'a, T>(
233        &'a self,
234        mut query: sqlx::query::QueryAs<'a, Postgres, T, sqlx::postgres::PgArguments>,
235    ) -> sqlx::query::QueryAs<'a, Postgres, T, sqlx::postgres::PgArguments>
236    where
237        T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
238    {
239        for param in &self.parameters {
240            query = match param {
241                QueryParameter::Text(s) => query.bind(s),
242                QueryParameter::Integer(i) => query.bind(*i),
243                QueryParameter::Float(f) => query.bind(*f),
244                QueryParameter::DateTime(dt) => query.bind(*dt),
245                QueryParameter::Tier(tier) => query.bind(tier),
246                QueryParameter::Uuid(uuid) => query.bind(*uuid),
247                QueryParameter::Vector(vec) => query.bind(vec),
248            };
249        }
250        query
251    }
252}
253
254impl MemoryRepository {
255    pub fn new(pool: PgPool) -> Self {
256        Self {
257            pool,
258            trigger_engine: None,
259            config: None,
260        }
261    }
262
263    pub fn with_config(pool: PgPool, config: Config) -> Self {
264        Self {
265            pool,
266            trigger_engine: None,
267            config: Some(config),
268        }
269    }
270
271    pub fn with_trigger_engine(
272        pool: PgPool,
273        trigger_engine: Arc<EventTriggeredScoringEngine>,
274    ) -> Self {
275        Self {
276            pool,
277            trigger_engine: Some(trigger_engine),
278            config: None,
279        }
280    }
281
282    pub fn with_config_and_trigger_engine(
283        pool: PgPool,
284        config: Config,
285        trigger_engine: Arc<EventTriggeredScoringEngine>,
286    ) -> Self {
287        Self {
288            pool,
289            trigger_engine: Some(trigger_engine),
290            config: Some(config),
291        }
292    }
293
294    pub fn pool(&self) -> &PgPool {
295        &self.pool
296    }
297
298    pub async fn create_memory(&self, request: CreateMemoryRequest) -> Result<Memory> {
299        self.create_memory_with_user_context(request, None).await
300    }
301
302    pub async fn create_memory_with_user_context(
303        &self,
304        request: CreateMemoryRequest,
305        user_id: Option<&str>,
306    ) -> Result<Memory> {
307        let id = Uuid::new_v4();
308        let content_hash = Memory::calculate_content_hash(&request.content);
309        let tier = request.tier.unwrap_or(MemoryTier::Working);
310
311        // Check for duplicates (skip in test mode)
312        let skip_duplicate_check =
313            std::env::var("SKIP_DUPLICATE_CHECK").unwrap_or_else(|_| "false".to_string()) == "true";
314
315        if !skip_duplicate_check {
316            let duplicate_exists = sqlx::query_scalar::<_, bool>(
317                "SELECT EXISTS(SELECT 1 FROM memories WHERE content_hash = $1 AND tier = $2 AND status = 'active')"
318            )
319            .bind(&content_hash)
320            .bind(tier)
321            .fetch_one(&self.pool)
322            .await?;
323
324            if duplicate_exists {
325                return Err(MemoryError::DuplicateContent {
326                    tier: format!("{tier:?}"),
327                });
328            }
329        }
330
331        // Check working memory capacity limits (Miller's 7±2 principle)
332        if tier == MemoryTier::Working {
333            if let Some(ref config) = self.config {
334                let working_count: i64 = sqlx::query_scalar(
335                    "SELECT COUNT(*) FROM memories WHERE tier = 'working' AND status = 'active'",
336                )
337                .fetch_one(&self.pool)
338                .await?;
339
340                if working_count >= config.tier_config.working_tier_limit as i64 {
341                    // Working memory at capacity - need to evict or reject
342                    info!(
343                        "Working memory at capacity ({}/{}), applying LRU eviction",
344                        working_count, config.tier_config.working_tier_limit
345                    );
346
347                    // Find the least recently used memory in working tier
348                    let lru_memory_id: Option<Uuid> = sqlx::query_scalar(
349                        "SELECT id FROM memories 
350                         WHERE tier = 'working' AND status = 'active'
351                         ORDER BY last_accessed ASC
352                         LIMIT 1",
353                    )
354                    .fetch_optional(&self.pool)
355                    .await?;
356
357                    if let Some(memory_id) = lru_memory_id {
358                        // Migrate LRU memory to warm tier
359                        sqlx::query(
360                            "UPDATE memories SET tier = 'warm', updated_at = NOW() 
361                             WHERE id = $1",
362                        )
363                        .bind(memory_id)
364                        .execute(&self.pool)
365                        .await?;
366
367                        info!(
368                            "Evicted LRU memory {} from working to warm tier due to capacity limit",
369                            memory_id
370                        );
371
372                        // Track eviction metric (if metrics are available)
373                        // This would be integrated with the MetricsCollector in production
374                    } else {
375                        // This shouldn't happen but handle gracefully
376                        return Err(MemoryError::StorageExhausted {
377                            tier: "working".to_string(),
378                            limit: config.tier_config.working_tier_limit,
379                        });
380                    }
381                }
382            }
383        }
384
385        // Apply event-triggered scoring if available
386        let (final_importance_score, trigger_result) = if let Some(trigger_engine) =
387            &self.trigger_engine
388        {
389            let original_importance = request.importance_score.unwrap_or(0.5);
390
391            match trigger_engine
392                .analyze_content(&request.content, original_importance, user_id)
393                .await
394            {
395                Ok(result) => {
396                    if result.triggered {
397                        info!(
398                            "Memory triggered event: {:?} (confidence: {:.2}, boosted: {:.2} -> {:.2})",
399                            result.trigger_type, result.confidence, result.original_importance, result.boosted_importance
400                        );
401                        (result.boosted_importance, Some(result))
402                    } else {
403                        (original_importance, Some(result))
404                    }
405                }
406                Err(e) => {
407                    warn!("Failed to analyze content for triggers: {}", e);
408                    (request.importance_score.unwrap_or(0.5), None)
409                }
410            }
411        } else {
412            (request.importance_score.unwrap_or(0.5), None)
413        };
414
415        let embedding = request.embedding.map(Vector::from);
416
417        // Add trigger metadata if triggered
418        let mut metadata = request.metadata.unwrap_or(serde_json::json!({}));
419        if let Some(trigger_result) = &trigger_result {
420            if trigger_result.triggered {
421                metadata["trigger_info"] = serde_json::json!({
422                    "triggered": true,
423                    "trigger_type": trigger_result.trigger_type,
424                    "confidence": trigger_result.confidence,
425                    "original_importance": trigger_result.original_importance,
426                    "boosted_importance": trigger_result.boosted_importance,
427                    "processing_time_ms": trigger_result.processing_time.as_millis()
428                });
429            }
430        }
431
432        let memory = sqlx::query_as::<_, Memory>(
433            r#"
434            INSERT INTO memories (
435                id, content, content_hash, embedding, tier, status, 
436                importance_score, metadata, parent_id, expires_at,
437                consolidation_strength, decay_rate
438            )
439            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
440            RETURNING *
441            "#,
442        )
443        .bind(id)
444        .bind(&request.content)
445        .bind(&content_hash)
446        .bind(embedding)
447        .bind(tier)
448        .bind(MemoryStatus::Active)
449        .bind(final_importance_score)
450        .bind(metadata)
451        .bind(request.parent_id)
452        .bind(request.expires_at)
453        .bind(1.0_f64) // Default consolidation_strength
454        .bind(1.0_f64) // Default decay_rate
455        .fetch_one(&self.pool)
456        .await?;
457
458        info!(
459            "Created memory {} in tier {:?} with importance {:.2}",
460            memory.id, memory.tier, final_importance_score
461        );
462        Ok(memory)
463    }
464
465    pub async fn get_memory(&self, id: Uuid) -> Result<Memory> {
466        let memory = sqlx::query_as::<_, Memory>(
467            r#"
468            UPDATE memories 
469            SET access_count = access_count + 1, 
470                last_accessed_at = NOW()
471            WHERE id = $1 AND status = 'active'
472            RETURNING *
473            "#,
474        )
475        .bind(id)
476        .fetch_optional(&self.pool)
477        .await?
478        .ok_or_else(|| MemoryError::NotFound { id: id.to_string() })?;
479
480        debug!("Retrieved memory {} from tier {:?}", id, memory.tier);
481        Ok(memory)
482    }
483
484    pub async fn update_memory(&self, id: Uuid, request: UpdateMemoryRequest) -> Result<Memory> {
485        let mut tx = self.pool.begin().await?;
486
487        // Get current memory
488        let current = sqlx::query_as::<_, Memory>(
489            "SELECT * FROM memories WHERE id = $1 AND status = 'active' FOR UPDATE",
490        )
491        .bind(id)
492        .fetch_optional(&mut *tx)
493        .await?
494        .ok_or_else(|| MemoryError::NotFound { id: id.to_string() })?;
495
496        // Update fields
497        let content = request.content.as_ref().unwrap_or(&current.content);
498        let content_hash = if request.content.is_some() {
499            Memory::calculate_content_hash(content)
500        } else {
501            current.content_hash.clone()
502        };
503
504        let embedding = request.embedding.map(Vector::from).or(current.embedding);
505        let tier = request.tier.unwrap_or(current.tier);
506        let importance_score = request.importance_score.unwrap_or(current.importance_score);
507        let metadata = request.metadata.as_ref().unwrap_or(&current.metadata);
508        let expires_at = request.expires_at.or(current.expires_at);
509
510        let updated = sqlx::query_as::<_, Memory>(
511            r#"
512            UPDATE memories 
513            SET content = $2, content_hash = $3, embedding = $4, tier = $5,
514                importance_score = $6, metadata = $7, expires_at = $8,
515                updated_at = NOW()
516            WHERE id = $1
517            RETURNING *
518            "#,
519        )
520        .bind(id)
521        .bind(content)
522        .bind(&content_hash)
523        .bind(embedding)
524        .bind(tier)
525        .bind(importance_score)
526        .bind(metadata)
527        .bind(expires_at)
528        .fetch_one(&mut *tx)
529        .await?;
530
531        // Record tier migration if changed
532        if current.tier != tier {
533            self.record_migration(
534                &mut tx,
535                id,
536                current.tier,
537                tier,
538                Some("Manual update".to_string()),
539            )
540            .await?;
541        }
542
543        tx.commit().await?;
544        info!("Updated memory {}", id);
545        Ok(updated)
546    }
547
548    pub async fn delete_memory(&self, id: Uuid) -> Result<()> {
549        let result = sqlx::query(
550            "UPDATE memories SET status = 'deleted' WHERE id = $1 AND status = 'active'",
551        )
552        .bind(id)
553        .execute(&self.pool)
554        .await?;
555
556        if result.rows_affected() == 0 {
557            return Err(MemoryError::NotFound { id: id.to_string() });
558        }
559
560        info!("Soft deleted memory {}", id);
561        Ok(())
562    }
563
564    /// Enhanced search method with memory-aware features for Story 9
565    pub async fn search_memories_enhanced(
566        &self,
567        request: crate::memory::enhanced_retrieval::MemoryAwareSearchRequest,
568    ) -> Result<crate::memory::enhanced_retrieval::MemoryAwareSearchResponse> {
569        use crate::memory::enhanced_retrieval::*;
570
571        let config = EnhancedRetrievalConfig::default();
572        let retrieval_engine = MemoryAwareRetrievalEngine::new(
573            config,
574            std::sync::Arc::new(MemoryRepository::new(self.pool.clone())),
575            None,
576        );
577
578        retrieval_engine.search(request).await
579    }
580
581    pub async fn search_memories(&self, request: SearchRequest) -> Result<SearchResponse> {
582        let start_time = Instant::now();
583
584        let search_type = request
585            .search_type
586            .as_ref()
587            .unwrap_or(&SearchType::Semantic)
588            .clone();
589        let limit = request.limit.unwrap_or(10);
590        let offset = request.offset.unwrap_or(0);
591
592        let results = match search_type {
593            SearchType::Semantic => self.semantic_search(&request).await?,
594            SearchType::Temporal => self.temporal_search(&request).await?,
595            SearchType::Hybrid => self.hybrid_search(&request).await?,
596            SearchType::FullText => self.fulltext_search(&request).await?,
597        };
598
599        let total_count = if request.include_facets.unwrap_or(false) {
600            Some(self.count_search_results(&request).await?)
601        } else {
602            None
603        };
604
605        let facets = if request.include_facets.unwrap_or(false) {
606            Some(self.generate_search_facets(&request).await?)
607        } else {
608            None
609        };
610
611        let suggestions = if request.query_text.is_some() {
612            Some(self.generate_query_suggestions(&request).await?)
613        } else {
614            None
615        };
616
617        let next_cursor = if results.len() as i32 >= limit {
618            Some(self.generate_cursor(offset + limit as i64, &request))
619        } else {
620            None
621        };
622
623        let execution_time_ms = start_time.elapsed().as_millis() as u64;
624
625        Ok(SearchResponse {
626            results,
627            total_count,
628            facets,
629            suggestions,
630            next_cursor,
631            execution_time_ms,
632        })
633    }
634
635    async fn semantic_search(&self, request: &SearchRequest) -> Result<Vec<SearchResult>> {
636        let query_embedding = if let Some(ref embedding) = request.query_embedding {
637            Vector::from(embedding.clone())
638        } else {
639            return Err(MemoryError::InvalidRequest {
640                message: "Query embedding is required for semantic search".to_string(),
641            });
642        };
643
644        let limit = request.limit.unwrap_or(10);
645        let offset = request.offset.unwrap_or(0);
646        let threshold = request.similarity_threshold.unwrap_or(0.7);
647
648        // Use safe query builder to prevent SQL injection
649        let mut builder = SafeQueryBuilder::new(
650            "SELECT m.*, 1 - (m.embedding <=> $1) as similarity_score FROM memories m WHERE m.status = 'active' AND m.embedding IS NOT NULL"
651        );
652        // Set bind index to 2 since $1 is already used for the query embedding
653        builder.bind_index = 2;
654
655        // Add filters safely
656        self.add_filters_safe(request, &mut builder)?;
657
658        // Add similarity threshold safely
659        builder.add_similarity_threshold(threshold as f64);
660
661        // Add ordering and pagination
662        builder.add_condition("ORDER BY similarity_score DESC");
663        builder.add_pagination(limit as usize, offset as usize)?;
664
665        // Build query and execute with parameterized binding
666        let query = builder.build_query();
667        let mut sqlx_query = sqlx::query(&query).bind(&query_embedding);
668        sqlx_query = builder.bind_parameters(sqlx_query);
669
670        let rows = sqlx_query.fetch_all(&self.pool).await?;
671
672        self.build_search_results(rows, request).await
673    }
674
675    async fn temporal_search(&self, request: &SearchRequest) -> Result<Vec<SearchResult>> {
676        let limit = request.limit.unwrap_or(10);
677        let offset = request.offset.unwrap_or(0);
678
679        // Use safe query builder to prevent SQL injection
680        let mut builder = SafeQueryBuilder::new(
681            "SELECT m.*, 0.0 as similarity_score FROM memories m WHERE m.status = 'active'",
682        );
683
684        // Add filters safely
685        self.add_filters_safe(request, &mut builder)?;
686
687        // Add ordering and pagination
688        builder.add_condition("ORDER BY m.created_at DESC, m.updated_at DESC");
689        builder.add_pagination(limit as usize, offset as usize)?;
690
691        // Build query and execute with parameterized binding
692        let query = builder.build_query();
693        let sqlx_query = builder.bind_parameters(sqlx::query(&query));
694
695        let rows = sqlx_query.fetch_all(&self.pool).await?;
696
697        self.build_search_results(rows, request).await
698    }
699
700    async fn hybrid_search(&self, request: &SearchRequest) -> Result<Vec<SearchResult>> {
701        // Use three-component scoring weights (default: equal weighting)
702        let _weights = request.hybrid_weights.as_ref().unwrap_or(&HybridWeights {
703            semantic_weight: 0.333,
704            temporal_weight: 0.333, // Maps to recency_score
705            importance_weight: 0.334,
706            access_frequency_weight: 0.0, // Included in relevance_score
707        });
708
709        let query_embedding = if let Some(ref embedding) = request.query_embedding {
710            Vector::from(embedding.clone())
711        } else {
712            return Err(MemoryError::InvalidRequest {
713                message: "Query embedding is required for hybrid search".to_string(),
714            });
715        };
716
717        let limit = request.limit.unwrap_or(10);
718        let offset = request.offset.unwrap_or(0);
719        let threshold = request.similarity_threshold.unwrap_or(0.5);
720
721        // Update component scores which will automatically update the generated combined_score
722        sqlx::query(
723            r#"
724            UPDATE memories 
725            SET recency_score = calculate_recency_score(last_accessed_at, created_at, 0.005),
726                relevance_score = LEAST(1.0, 
727                    0.5 * importance_score + 
728                    0.3 * LEAST(1.0, access_count / 100.0) + 
729                    0.2
730                )
731            WHERE status = 'active' AND embedding IS NOT NULL
732            "#,
733        )
734        .execute(&self.pool)
735        .await?;
736
737        // Use the generated combined_score column for optimal P99 <1ms performance
738        let query = format!(
739            r#"
740            SELECT m.*,
741                1 - (m.embedding <=> $1) as similarity_score,
742                m.recency_score as temporal_score,
743                m.importance_score,
744                m.relevance_score,
745                COALESCE(m.access_count, 0) as access_count,
746                m.combined_score as combined_score
747            FROM memories m
748            WHERE m.status = 'active'
749                AND m.embedding IS NOT NULL
750                AND 1 - (m.embedding <=> $1) >= {threshold}
751            ORDER BY m.combined_score DESC, similarity_score DESC
752            LIMIT {limit} OFFSET {offset}
753            "#
754        );
755
756        let rows = sqlx::query(&query)
757            .bind(&query_embedding)
758            .fetch_all(&self.pool)
759            .await?;
760
761        self.build_search_results(rows, request).await
762    }
763
764    async fn fulltext_search(&self, request: &SearchRequest) -> Result<Vec<SearchResult>> {
765        let query_text =
766            request
767                .query_text
768                .as_ref()
769                .ok_or_else(|| MemoryError::InvalidRequest {
770                    message: "Query text is required for full-text search".to_string(),
771                })?;
772
773        let limit = request.limit.unwrap_or(10);
774        let offset = request.offset.unwrap_or(0);
775
776        let query = format!(
777            r#"
778            SELECT m.*,
779                ts_rank_cd(to_tsvector('english', m.content), plainto_tsquery('english', $1)) as similarity_score
780            FROM memories m
781            WHERE m.status = 'active'
782                AND to_tsvector('english', m.content) @@ plainto_tsquery('english', $1)
783            ORDER BY similarity_score DESC
784            LIMIT {limit} OFFSET {offset}
785            "#
786        );
787
788        let rows = sqlx::query(&query)
789            .bind(query_text)
790            .fetch_all(&self.pool)
791            .await?;
792
793        self.build_search_results(rows, request).await
794    }
795
796    /// Safe version of add_filters using SafeQueryBuilder to prevent SQL injection
797    fn add_filters_safe(
798        &self,
799        request: &SearchRequest,
800        builder: &mut SafeQueryBuilder,
801    ) -> Result<()> {
802        if let Some(tier) = &request.tier {
803            builder.add_tier_filter(tier);
804        }
805
806        if let Some(date_range) = &request.date_range {
807            builder.add_date_range(date_range.start.as_ref(), date_range.end.as_ref());
808        }
809
810        if let Some(importance_range) = &request.importance_range {
811            builder.add_importance_range(
812                importance_range.min.map(|v| v as f64),
813                importance_range.max.map(|v| v as f64),
814            );
815        }
816
817        Ok(())
818    }
819
820    async fn build_search_results(
821        &self,
822        rows: Vec<sqlx::postgres::PgRow>,
823        request: &SearchRequest,
824    ) -> Result<Vec<SearchResult>> {
825        let mut results = Vec::new();
826        let explain_score = request.explain_score.unwrap_or(false);
827
828        for row in rows {
829            let memory = Memory {
830                id: row.try_get("id")?,
831                content: row.try_get("content")?,
832                content_hash: row.try_get("content_hash")?,
833                embedding: row.try_get("embedding")?,
834                tier: row.try_get("tier")?,
835                status: row.try_get("status")?,
836                importance_score: row.try_get("importance_score")?,
837                access_count: row.try_get("access_count")?,
838                last_accessed_at: row.try_get("last_accessed_at")?,
839                metadata: row.try_get("metadata")?,
840                parent_id: row.try_get("parent_id")?,
841                created_at: row.try_get("created_at")?,
842                updated_at: row.try_get("updated_at")?,
843                expires_at: row.try_get("expires_at")?,
844                consolidation_strength: row.try_get("consolidation_strength").unwrap_or(1.0),
845                decay_rate: row.try_get("decay_rate").unwrap_or(1.0),
846                recall_probability: row.try_get("recall_probability")?,
847                last_recall_interval: row.try_get("last_recall_interval")?,
848                recency_score: row.try_get("recency_score").unwrap_or(0.0),
849                relevance_score: row.try_get("relevance_score").unwrap_or(0.0),
850            };
851
852            let similarity_score: f32 = row.try_get("similarity_score").unwrap_or(0.0);
853            let combined_score: f32 = row.try_get("combined_score").unwrap_or(similarity_score);
854            let temporal_score: Option<f32> = row.try_get("temporal_score").ok();
855            let access_frequency_score: Option<f32> = row.try_get("access_frequency_score").ok();
856            let importance_score = memory.importance_score; // Extract before move
857
858            let score_explanation = if explain_score {
859                Some(ScoreExplanation {
860                    semantic_contribution: similarity_score * 0.4,
861                    temporal_contribution: temporal_score.unwrap_or(0.0) * 0.3,
862                    importance_contribution: (importance_score * 0.2) as f32,
863                    access_frequency_contribution: access_frequency_score.unwrap_or(0.0) * 0.1,
864                    total_score: combined_score,
865                    factors: vec![
866                        "semantic similarity".to_string(),
867                        "recency".to_string(),
868                        "importance".to_string(),
869                    ],
870                })
871            } else {
872                None
873            };
874
875            results.push(SearchResult {
876                memory,
877                similarity_score,
878                temporal_score,
879                importance_score,
880                access_frequency_score,
881                combined_score,
882                score_explanation,
883            });
884        }
885
886        debug!("Built {} search results", results.len());
887        Ok(results)
888    }
889
890    async fn count_search_results(&self, _request: &SearchRequest) -> Result<i64> {
891        // Simplified count - would implement filtering logic similar to search
892        let count: i64 =
893            sqlx::query_scalar("SELECT COUNT(*) FROM memories WHERE status = 'active'")
894                .fetch_one(&self.pool)
895                .await?;
896        Ok(count)
897    }
898
899    async fn generate_search_facets(&self, _request: &SearchRequest) -> Result<SearchFacets> {
900        // Generate tier facets
901        let tier_rows: Vec<(String, i64)> = sqlx::query_as(
902            "SELECT tier, COUNT(*) FROM memories WHERE status = 'active' GROUP BY tier",
903        )
904        .fetch_all(&self.pool)
905        .await?;
906
907        let mut tiers = HashMap::new();
908        for (tier_str, count) in tier_rows {
909            if let Ok(tier) = tier_str.parse::<MemoryTier>() {
910                tiers.insert(tier, count);
911            }
912        }
913
914        // Generate date histogram (simplified)
915        let date_histogram = vec![DateBucket {
916            date: Utc::now(),
917            count: 10,
918            interval: "day".to_string(),
919        }];
920
921        // Generate importance ranges
922        let importance_ranges = vec![
923            ImportanceRange {
924                min: 0.0,
925                max: 0.3,
926                count: 5,
927                label: "Low".to_string(),
928            },
929            ImportanceRange {
930                min: 0.3,
931                max: 0.7,
932                count: 15,
933                label: "Medium".to_string(),
934            },
935            ImportanceRange {
936                min: 0.7,
937                max: 1.0,
938                count: 8,
939                label: "High".to_string(),
940            },
941        ];
942
943        Ok(SearchFacets {
944            tiers,
945            date_histogram,
946            importance_ranges,
947            tags: HashMap::new(), // Would extract from metadata
948        })
949    }
950
951    async fn generate_query_suggestions(&self, _request: &SearchRequest) -> Result<Vec<String>> {
952        // Simplified implementation - would use ML model or query history
953        Ok(vec![
954            "recent code changes".to_string(),
955            "function definitions".to_string(),
956            "error handling patterns".to_string(),
957        ])
958    }
959
960    fn generate_cursor(&self, offset: i64, _request: &SearchRequest) -> String {
961        // Simple cursor implementation - would encode more search state in production
962        use base64::{engine::general_purpose::STANDARD, Engine};
963        STANDARD.encode(format!("offset:{offset}"))
964    }
965
966    // Legacy method for backward compatibility
967    pub async fn search_memories_simple(
968        &self,
969        request: SearchRequest,
970    ) -> Result<Vec<SearchResult>> {
971        let response = self.search_memories(request).await?;
972        Ok(response.results)
973    }
974
975    pub async fn get_memories_by_tier(
976        &self,
977        tier: MemoryTier,
978        limit: Option<i64>,
979    ) -> Result<Vec<Memory>> {
980        let limit = limit.unwrap_or(100);
981
982        let memories = sqlx::query_as::<_, Memory>(
983            r#"
984            SELECT * FROM memories 
985            WHERE tier = $1 AND status = 'active'
986            ORDER BY importance_score DESC, updated_at DESC
987            LIMIT $2
988            "#,
989        )
990        .bind(tier)
991        .bind(limit)
992        .fetch_all(&self.pool)
993        .await?;
994
995        Ok(memories)
996    }
997
998    pub async fn migrate_memory(
999        &self,
1000        id: Uuid,
1001        to_tier: MemoryTier,
1002        reason: Option<String>,
1003    ) -> Result<Memory> {
1004        let mut tx = self.pool.begin().await?;
1005
1006        // Get current memory with lock
1007        let current = sqlx::query_as::<_, Memory>(
1008            "SELECT * FROM memories WHERE id = $1 AND status = 'active' FOR UPDATE",
1009        )
1010        .bind(id)
1011        .fetch_optional(&mut *tx)
1012        .await?
1013        .ok_or_else(|| MemoryError::NotFound { id: id.to_string() })?;
1014
1015        if current.tier == to_tier {
1016            return Ok(current);
1017        }
1018
1019        // Validate tier transition
1020        let valid_transition = match (current.tier, to_tier) {
1021            (MemoryTier::Working, MemoryTier::Warm)
1022            | (MemoryTier::Working, MemoryTier::Cold)
1023            | (MemoryTier::Warm, MemoryTier::Cold)
1024            | (MemoryTier::Warm, MemoryTier::Working)
1025            | (MemoryTier::Cold, MemoryTier::Warm) => true,
1026            _ => false,
1027        };
1028
1029        if !valid_transition {
1030            return Err(MemoryError::InvalidTierTransition {
1031                from: format!("{:?}", current.tier),
1032                to: format!("{to_tier:?}"),
1033            });
1034        }
1035
1036        let start = std::time::Instant::now();
1037
1038        // Update tier
1039        let updated = sqlx::query_as::<_, Memory>(
1040            r#"
1041            UPDATE memories 
1042            SET tier = $2, status = 'active', updated_at = NOW()
1043            WHERE id = $1
1044            RETURNING *
1045            "#,
1046        )
1047        .bind(id)
1048        .bind(to_tier)
1049        .fetch_one(&mut *tx)
1050        .await?;
1051
1052        let duration_ms = start.elapsed().as_millis() as i32;
1053
1054        // Record migration
1055        self.record_migration(&mut tx, id, current.tier, to_tier, reason)
1056            .await?;
1057
1058        tx.commit().await?;
1059
1060        info!(
1061            "Migrated memory {} from {:?} to {:?} in {}ms",
1062            id, current.tier, to_tier, duration_ms
1063        );
1064
1065        Ok(updated)
1066    }
1067
1068    async fn record_migration(
1069        &self,
1070        tx: &mut Transaction<'_, Postgres>,
1071        memory_id: Uuid,
1072        from_tier: MemoryTier,
1073        to_tier: MemoryTier,
1074        reason: Option<String>,
1075    ) -> Result<()> {
1076        sqlx::query(
1077            r#"
1078            INSERT INTO migration_history (memory_id, from_tier, to_tier, migration_reason, success)
1079            VALUES ($1, $2, $3, $4, true)
1080            "#,
1081        )
1082        .bind(memory_id)
1083        .bind(from_tier)
1084        .bind(to_tier)
1085        .bind(reason)
1086        .execute(&mut **tx)
1087        .await?;
1088
1089        Ok(())
1090    }
1091
1092    pub async fn get_expired_memories(&self) -> Result<Vec<Memory>> {
1093        let memories = sqlx::query_as::<_, Memory>(
1094            r#"
1095            SELECT * FROM memories 
1096            WHERE status = 'active' 
1097                AND expires_at IS NOT NULL 
1098                AND expires_at < NOW()
1099            "#,
1100        )
1101        .fetch_all(&self.pool)
1102        .await?;
1103
1104        Ok(memories)
1105    }
1106
1107    pub async fn cleanup_expired_memories(&self) -> Result<u64> {
1108        let result = sqlx::query(
1109            r#"
1110            UPDATE memories 
1111            SET status = 'deleted' 
1112            WHERE status = 'active' 
1113                AND expires_at IS NOT NULL 
1114                AND expires_at < NOW()
1115            "#,
1116        )
1117        .execute(&self.pool)
1118        .await?;
1119
1120        let count = result.rows_affected();
1121        if count > 0 {
1122            info!("Cleaned up {} expired memories", count);
1123        }
1124
1125        Ok(count)
1126    }
1127
1128    pub async fn get_migration_candidates(
1129        &self,
1130        tier: MemoryTier,
1131        limit: i64,
1132    ) -> Result<Vec<Memory>> {
1133        let query = match tier {
1134            MemoryTier::Working => {
1135                r#"
1136                SELECT * FROM memories 
1137                WHERE tier = 'working' 
1138                    AND status = 'active'
1139                    AND (
1140                        importance_score < 0.3 
1141                        OR (last_accessed_at IS NOT NULL 
1142                            AND last_accessed_at < NOW() - INTERVAL '24 hours')
1143                    )
1144                ORDER BY importance_score ASC, last_accessed_at ASC NULLS FIRST
1145                LIMIT $1
1146                "#
1147            }
1148            MemoryTier::Warm => {
1149                r#"
1150                SELECT * FROM memories 
1151                WHERE tier = 'warm' 
1152                    AND status = 'active'
1153                    AND importance_score < 0.1 
1154                    AND updated_at < NOW() - INTERVAL '7 days'
1155                ORDER BY importance_score ASC, updated_at ASC
1156                LIMIT $1
1157                "#
1158            }
1159            MemoryTier::Cold => {
1160                return Ok(Vec::new());
1161            }
1162            MemoryTier::Frozen => {
1163                return Ok(Vec::new()); // Frozen memories don't migrate further
1164            }
1165        };
1166
1167        let memories = sqlx::query_as::<_, Memory>(query)
1168            .bind(limit)
1169            .fetch_all(&self.pool)
1170            .await?;
1171
1172        Ok(memories)
1173    }
1174
1175    /// Get working memory pressure ratio (0.0 to 1.0)
1176    pub async fn get_working_memory_pressure(&self) -> Result<f64> {
1177        if let Some(ref config) = self.config {
1178            let working_count: i64 = sqlx::query_scalar(
1179                "SELECT COUNT(*) FROM memories WHERE tier = 'working' AND status = 'active'",
1180            )
1181            .fetch_one(&self.pool)
1182            .await?;
1183
1184            let pressure = working_count as f64 / config.tier_config.working_tier_limit as f64;
1185            Ok(pressure.min(1.0))
1186        } else {
1187            Ok(0.0)
1188        }
1189    }
1190
1191    pub async fn get_statistics(&self) -> Result<MemoryStatistics> {
1192        let stats = sqlx::query_as::<_, MemoryStatistics>(
1193            r#"
1194            SELECT 
1195                COUNT(*) FILTER (WHERE tier = 'working' AND status = 'active') as working_count,
1196                COUNT(*) FILTER (WHERE tier = 'warm' AND status = 'active') as warm_count,
1197                COUNT(*) FILTER (WHERE tier = 'cold' AND status = 'active') as cold_count,
1198                COUNT(*) FILTER (WHERE status = 'active') as total_active,
1199                COUNT(*) FILTER (WHERE status = 'deleted') as total_deleted,
1200                AVG(importance_score) FILTER (WHERE status = 'active') as avg_importance,
1201                MAX(access_count) FILTER (WHERE status = 'active') as max_access_count,
1202                CAST(AVG(access_count) FILTER (WHERE status = 'active') AS FLOAT8) as avg_access_count
1203            FROM memories
1204            "#,
1205        )
1206        .fetch_one(&self.pool)
1207        .await?;
1208
1209        Ok(stats)
1210    }
1211
1212    // Consolidation and freezing methods
1213
1214    /// Get consolidation analytics for all tiers
1215    pub async fn get_consolidation_analytics(&self) -> Result<Vec<ConsolidationAnalytics>> {
1216        let analytics = sqlx::query_as::<_, ConsolidationAnalytics>(
1217            r#"
1218            SELECT 
1219                tier,
1220                COUNT(*) as total_memories,
1221                AVG(consolidation_strength) as avg_consolidation_strength,
1222                AVG(recall_probability) as avg_recall_probability,
1223                AVG(decay_rate) as avg_decay_rate,
1224                AVG(EXTRACT(EPOCH FROM (NOW() - created_at)) / 86400) as avg_age_days,
1225                COUNT(*) FILTER (WHERE recall_probability < $1) as migration_candidates,
1226                COUNT(*) FILTER (WHERE last_accessed_at IS NULL) as never_accessed,
1227                COUNT(*) FILTER (WHERE last_accessed_at > NOW() - INTERVAL '24 hours') as accessed_recently
1228            FROM memories 
1229            WHERE status = 'active' 
1230            GROUP BY tier
1231            ORDER BY 
1232                CASE tier 
1233                    WHEN 'working' THEN 1 
1234                    WHEN 'warm' THEN 2 
1235                    WHEN 'cold' THEN 3 
1236                    WHEN 'frozen' THEN 4 
1237                END
1238            "#,
1239        )
1240        .bind(constants::FROZEN_MIGRATION_THRESHOLD)
1241        .fetch_all(&self.pool)
1242        .await?;
1243
1244        Ok(analytics)
1245    }
1246
1247    /* TEMPORARILY COMMENTED OUT - missing consolidation_events table
1248    /// Get consolidation event summary for the last week
1249    pub async fn get_consolidation_events(&self) -> Result<Vec<ConsolidationEventSummary>> {
1250        let events = sqlx::query_as::<_, ConsolidationEventSummary>(
1251            r#"
1252            SELECT
1253                event_type,
1254                COUNT(*) as event_count,
1255                AVG(new_consolidation_strength - previous_consolidation_strength) as avg_strength_change,
1256                AVG(COALESCE(new_recall_probability, 0) - COALESCE(previous_recall_probability, 0)) as avg_probability_change,
1257                AVG(EXTRACT(EPOCH FROM recall_interval) / 3600) as avg_recall_interval_hours
1258            FROM memory_consolidation_log
1259            WHERE created_at > NOW() - INTERVAL '7 days'
1260            GROUP BY event_type
1261            ORDER BY event_count DESC
1262            "#,
1263        )
1264        .fetch_all(&self.pool)
1265        .await?;
1266
1267        Ok(events)
1268    }
1269    */
1270
1271    /// Find memories ready for tier migration based on recall probability
1272    pub async fn find_migration_candidates(
1273        &self,
1274        tier: MemoryTier,
1275        limit: i32,
1276    ) -> Result<Vec<Memory>> {
1277        let threshold = match tier {
1278            MemoryTier::Working => 0.7,
1279            MemoryTier::Warm => 0.5,
1280            MemoryTier::Cold => 0.2,
1281            MemoryTier::Frozen => 0.0, // Frozen memories don't migrate
1282        };
1283
1284        let memories = sqlx::query_as::<_, Memory>(
1285            r#"
1286            SELECT * FROM memories 
1287            WHERE tier = $1 
1288            AND status = 'active'
1289            AND (recall_probability < $2 OR recall_probability IS NULL)
1290            ORDER BY recall_probability ASC NULLS LAST, consolidation_strength ASC
1291            LIMIT $3
1292            "#,
1293        )
1294        .bind(tier)
1295        .bind(threshold)
1296        .bind(limit)
1297        .fetch_all(&self.pool)
1298        .await?;
1299
1300        Ok(memories)
1301    }
1302
1303    /// Update memory consolidation parameters
1304    pub async fn update_consolidation(
1305        &self,
1306        memory_id: Uuid,
1307        consolidation_strength: f64,
1308        decay_rate: f64,
1309        recall_probability: Option<f64>,
1310    ) -> Result<()> {
1311        sqlx::query(
1312            r#"
1313            UPDATE memories 
1314            SET consolidation_strength = $2, 
1315                decay_rate = $3, 
1316                recall_probability = $4,
1317                updated_at = NOW()
1318            WHERE id = $1 AND status = 'active'
1319            "#,
1320        )
1321        .bind(memory_id)
1322        .bind(consolidation_strength)
1323        .bind(decay_rate)
1324        .bind(recall_probability)
1325        .execute(&self.pool)
1326        .await?;
1327
1328        Ok(())
1329    }
1330
1331    /// Log a consolidation event
1332    pub async fn log_consolidation_event(
1333        &self,
1334        memory_id: Uuid,
1335        event_type: &str,
1336        previous_strength: f64,
1337        new_strength: f64,
1338        previous_probability: Option<f64>,
1339        new_probability: Option<f64>,
1340        recall_interval: Option<PgInterval>,
1341        context: serde_json::Value,
1342    ) -> Result<()> {
1343        sqlx::query(
1344            r#"
1345            INSERT INTO memory_consolidation_log (
1346                memory_id, event_type, previous_consolidation_strength, 
1347                new_consolidation_strength, previous_recall_probability,
1348                new_recall_probability, recall_interval, access_context
1349            )
1350            VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
1351            "#,
1352        )
1353        .bind(memory_id)
1354        .bind(event_type)
1355        .bind(previous_strength)
1356        .bind(new_strength)
1357        .bind(previous_probability)
1358        .bind(new_probability)
1359        .bind(recall_interval)
1360        .bind(context)
1361        .execute(&self.pool)
1362        .await?;
1363
1364        Ok(())
1365    }
1366
1367    /// Freeze a memory by moving it to compressed storage using zstd compression
1368    pub async fn freeze_memory(
1369        &self,
1370        memory_id: Uuid,
1371        reason: Option<String>,
1372    ) -> Result<FreezeMemoryResponse> {
1373        use super::compression::{FrozenMemoryCompression, ZstdCompressionEngine};
1374        use std::time::Instant;
1375
1376        let start_time = Instant::now();
1377        let mut tx = self.pool.begin().await?;
1378
1379        // Get the memory to freeze with validation
1380        let memory = sqlx::query_as::<_, Memory>(
1381            "SELECT * FROM memories WHERE id = $1 AND status = 'active'",
1382        )
1383        .bind(memory_id)
1384        .fetch_optional(&mut *tx)
1385        .await?
1386        .ok_or_else(|| MemoryError::NotFound {
1387            id: memory_id.to_string(),
1388        })?;
1389
1390        // Ensure we only freeze cold memories with P(r) < 0.2
1391        if memory.tier != MemoryTier::Cold {
1392            return Err(MemoryError::InvalidRequest {
1393                message: format!(
1394                    "Can only freeze memories in cold tier, found {:?}",
1395                    memory.tier
1396                ),
1397            });
1398        }
1399
1400        let recall_probability = memory.recall_probability.unwrap_or(0.0);
1401        if recall_probability >= 0.2 {
1402            return Err(MemoryError::InvalidRequest {
1403                message: format!(
1404                    "Can only freeze memories with P(r) < 0.2, found {recall_probability:.3}"
1405                ),
1406            });
1407        }
1408
1409        info!(
1410            "Freezing memory {} (P(r)={:.3}, content_length={})",
1411            memory_id,
1412            recall_probability,
1413            memory.content.len()
1414        );
1415
1416        // Compress the memory data using zstd
1417        let compression_engine = ZstdCompressionEngine::new();
1418        let compression_result =
1419            compression_engine.compress_memory_data(&memory.content, &memory.metadata)?;
1420
1421        // Validate compression quality
1422        FrozenMemoryCompression::validate_compression_quality(
1423            compression_result.compression_ratio,
1424            memory.content.len(),
1425        )?;
1426
1427        let (compressed_data, original_size, compressed_size, compression_ratio) =
1428            FrozenMemoryCompression::to_database_format(compression_result);
1429
1430        debug!(
1431            "Compression completed: {:.2}:1 ratio, {} -> {} bytes",
1432            compression_ratio, original_size, compressed_size
1433        );
1434
1435        // Create frozen memory record
1436        let frozen_id = Uuid::new_v4();
1437        sqlx::query(
1438            r#"
1439            INSERT INTO frozen_memories (
1440                id, original_memory_id, compressed_content, 
1441                original_metadata, original_content_hash, original_embedding,
1442                original_tier, freeze_reason, compression_ratio,
1443                original_size_bytes, compressed_size_bytes
1444            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
1445            "#,
1446        )
1447        .bind(frozen_id)
1448        .bind(memory.id)
1449        .bind(&compressed_data)
1450        .bind(&memory.metadata)
1451        .bind(&memory.content_hash)
1452        .bind(memory.embedding.as_ref())
1453        .bind(memory.tier)
1454        .bind(
1455            reason
1456                .as_deref()
1457                .unwrap_or("Auto-frozen: P(r) < 0.2 threshold"),
1458        )
1459        .bind(compression_ratio)
1460        .bind(original_size)
1461        .bind(compressed_size)
1462        .execute(&mut *tx)
1463        .await?;
1464
1465        // Update original memory to frozen tier
1466        sqlx::query(
1467            "UPDATE memories SET tier = 'frozen', status = 'archived', updated_at = NOW() WHERE id = $1"
1468        )
1469        .bind(memory_id)
1470        .execute(&mut *tx)
1471        .await?;
1472
1473        // Log the migration
1474        let processing_time_ms = start_time.elapsed().as_millis() as i32;
1475        sqlx::query(
1476            r#"
1477            INSERT INTO migration_history (
1478                memory_id, from_tier, to_tier, migration_reason,
1479                migration_duration_ms, success
1480            ) VALUES ($1, $2, 'frozen', $3, $4, true)
1481            "#,
1482        )
1483        .bind(memory_id)
1484        .bind(memory.tier)
1485        .bind(format!("Frozen with {compression_ratio:.2}:1 compression"))
1486        .bind(processing_time_ms)
1487        .execute(&mut *tx)
1488        .await?;
1489
1490        tx.commit().await?;
1491
1492        info!(
1493            "Successfully froze memory {} with {:.2}:1 compression in {}ms",
1494            memory_id, compression_ratio, processing_time_ms
1495        );
1496
1497        Ok(FreezeMemoryResponse {
1498            frozen_id,
1499            compression_ratio: Some(compression_ratio),
1500            original_tier: memory.tier,
1501            frozen_at: Utc::now(),
1502        })
1503    }
1504
1505    /// Unfreeze a memory and restore it to active status with zstd decompression
1506    pub async fn unfreeze_memory(
1507        &self,
1508        frozen_id: Uuid,
1509        target_tier: Option<MemoryTier>,
1510    ) -> Result<UnfreezeMemoryResponse> {
1511        use super::compression::ZstdCompressionEngine;
1512        use rand::Rng;
1513        use std::time::Instant;
1514        use tokio::time::{sleep, Duration};
1515
1516        let start_time = Instant::now();
1517        let mut tx = self.pool.begin().await?;
1518
1519        // Get the frozen memory details
1520        let frozen_memory =
1521            sqlx::query_as::<_, FrozenMemory>("SELECT * FROM frozen_memories WHERE id = $1")
1522                .bind(frozen_id)
1523                .fetch_optional(&mut *tx)
1524                .await?
1525                .ok_or_else(|| MemoryError::NotFound {
1526                    id: frozen_id.to_string(),
1527                })?;
1528
1529        info!(
1530            "Unfreezing memory {} (compression_ratio: {:.2}:1)",
1531            frozen_id,
1532            frozen_memory.compression_ratio.unwrap_or(0.0)
1533        );
1534
1535        // Implement intentional 2-5 second delay for frozen memory retrieval
1536        let mut rng = rand::thread_rng();
1537        let delay_seconds = rng.gen_range(2..=5);
1538
1539        info!(
1540            "Applying {}-second intentional delay for frozen tier retrieval",
1541            delay_seconds
1542        );
1543        sleep(Duration::from_secs(delay_seconds)).await;
1544
1545        // Decompress the memory data using zstd
1546        let compression_engine = ZstdCompressionEngine::new();
1547
1548        // First, try to extract the compressed data
1549        // The frozen_memory.compressed_content is stored as JSONB but contains BYTEA data
1550        let compressed_data = match &frozen_memory.compressed_content {
1551            serde_json::Value::String(base64_data) => {
1552                // If it's a base64 string, decode it
1553                BASE64_STANDARD
1554                    .decode(base64_data.as_bytes())
1555                    .map_err(|e| MemoryError::DecompressionError {
1556                        message: format!("Failed to decode base64 compressed data: {e}"),
1557                    })?
1558            }
1559            serde_json::Value::Array(byte_array) => {
1560                // If it's an array of numbers, convert to bytes
1561                byte_array
1562                    .iter()
1563                    .map(|v| v.as_u64().unwrap_or(0) as u8)
1564                    .collect()
1565            }
1566            _ => {
1567                // Fallback: treat as raw bytes (this shouldn't happen with proper BYTEA storage)
1568                return Err(MemoryError::DecompressionError {
1569                    message: "Invalid compressed data format in database".to_string(),
1570                });
1571            }
1572        };
1573
1574        let decompressed_data = compression_engine.decompress_memory_data(&compressed_data)?;
1575
1576        debug!(
1577            "Decompression completed: restored {} bytes of content",
1578            decompressed_data.content.len()
1579        );
1580
1581        // Determine restoration tier
1582        let restoration_tier = target_tier
1583            .or(Some(frozen_memory.original_tier))
1584            .unwrap_or(MemoryTier::Working);
1585
1586        // Restore the original memory
1587        let memory_id = frozen_memory.original_memory_id;
1588        let rows_affected = sqlx::query(
1589            r#"
1590            UPDATE memories 
1591            SET 
1592                content = $1,
1593                tier = $2,
1594                status = 'active',
1595                metadata = $3,
1596                updated_at = NOW()
1597            WHERE id = $4
1598            "#,
1599        )
1600        .bind(&decompressed_data.content)
1601        .bind(restoration_tier)
1602        .bind(&decompressed_data.metadata)
1603        .bind(memory_id)
1604        .execute(&mut *tx)
1605        .await?
1606        .rows_affected();
1607
1608        if rows_affected == 0 {
1609            // Create new memory if original was deleted
1610            sqlx::query(
1611                r#"
1612                INSERT INTO memories (
1613                    id, content, content_hash, embedding, tier, status,
1614                    importance_score, metadata, created_at, updated_at
1615                ) VALUES ($1, $2, $3, $4, $5, 'active', 0.5, $6, NOW(), NOW())
1616                "#,
1617            )
1618            .bind(memory_id)
1619            .bind(&decompressed_data.content)
1620            .bind(&frozen_memory.original_content_hash)
1621            .bind(frozen_memory.original_embedding.as_ref())
1622            .bind(restoration_tier)
1623            .bind(&decompressed_data.metadata)
1624            .execute(&mut *tx)
1625            .await?;
1626
1627            info!("Recreated deleted memory {} during unfreeze", memory_id);
1628        }
1629
1630        // Update frozen memory access tracking
1631        sqlx::query(
1632            r#"
1633            UPDATE frozen_memories 
1634            SET 
1635                unfreeze_count = COALESCE(unfreeze_count, 0) + 1,
1636                last_unfrozen_at = NOW(),
1637                updated_at = NOW()
1638            WHERE id = $1
1639            "#,
1640        )
1641        .bind(frozen_id)
1642        .execute(&mut *tx)
1643        .await?;
1644
1645        // Log the migration
1646        let processing_time_ms = start_time.elapsed().as_millis() as i32;
1647        sqlx::query(
1648            r#"
1649            INSERT INTO migration_history (
1650                memory_id, from_tier, to_tier, migration_reason,
1651                migration_duration_ms, success
1652            ) VALUES ($1, 'frozen', $2, $3, $4, true)
1653            "#,
1654        )
1655        .bind(memory_id)
1656        .bind(restoration_tier)
1657        .bind(format!("Unfrozen after {delay_seconds} second delay"))
1658        .bind(processing_time_ms)
1659        .execute(&mut *tx)
1660        .await?;
1661
1662        tx.commit().await?;
1663
1664        info!(
1665            "Successfully unfroze memory {} to {:?} tier in {}ms (including {}s delay)",
1666            memory_id, restoration_tier, processing_time_ms, delay_seconds
1667        );
1668
1669        Ok(UnfreezeMemoryResponse {
1670            memory_id,
1671            retrieval_delay_seconds: delay_seconds as i32,
1672            restoration_tier,
1673            unfrozen_at: Utc::now(),
1674        })
1675    }
1676
1677    /// Get all frozen memories with pagination
1678    pub async fn get_frozen_memories(&self, limit: i32, offset: i64) -> Result<Vec<FrozenMemory>> {
1679        let frozen_memories = sqlx::query_as::<_, FrozenMemory>(
1680            r#"
1681            SELECT * FROM frozen_memories 
1682            ORDER BY frozen_at DESC
1683            LIMIT $1 OFFSET $2
1684            "#,
1685        )
1686        .bind(limit)
1687        .bind(offset)
1688        .fetch_all(&self.pool)
1689        .await?;
1690
1691        Ok(frozen_memories)
1692    }
1693
1694    /// Search frozen memories by content or metadata
1695    pub async fn search_frozen_memories(
1696        &self,
1697        query: &str,
1698        limit: i32,
1699    ) -> Result<Vec<FrozenMemory>> {
1700        let frozen_memories = sqlx::query_as::<_, FrozenMemory>(
1701            r#"
1702            SELECT * FROM frozen_memories 
1703            WHERE 
1704                convert_from(compressed_content, 'UTF8') ILIKE $1
1705                OR freeze_reason ILIKE $1
1706            ORDER BY frozen_at DESC
1707            LIMIT $2
1708            "#,
1709        )
1710        .bind(format!("%{query}%"))
1711        .bind(limit)
1712        .fetch_all(&self.pool)
1713        .await?;
1714
1715        Ok(frozen_memories)
1716    }
1717
1718    /// Get tier statistics for monitoring
1719    pub async fn get_tier_statistics(&self) -> Result<Vec<MemoryTierStatistics>> {
1720        let stats = sqlx::query_as::<_, MemoryTierStatistics>(
1721            r#"
1722            SELECT * FROM memory_tier_statistics 
1723            WHERE snapshot_timestamp > NOW() - INTERVAL '24 hours'
1724            ORDER BY snapshot_timestamp DESC, tier
1725            "#,
1726        )
1727        .fetch_all(&self.pool)
1728        .await?;
1729
1730        Ok(stats)
1731    }
1732
1733    /// Update tier statistics (typically called by a background job)
1734    pub async fn update_tier_statistics(&self) -> Result<()> {
1735        sqlx::query("SELECT update_tier_statistics()")
1736            .execute(&self.pool)
1737            .await?;
1738
1739        Ok(())
1740    }
1741
1742    /// Search memories with consolidation criteria (SQL injection safe)
1743    pub async fn search_by_consolidation(
1744        &self,
1745        request: ConsolidationSearchRequest,
1746    ) -> Result<Vec<Memory>> {
1747        let limit = request.limit.unwrap_or(10);
1748        let offset = request.offset.unwrap_or(0);
1749
1750        // Use safe query builder to prevent SQL injection
1751        let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
1752
1753        // Add consolidation strength range filter
1754        builder.add_consolidation_strength_range(
1755            request.min_consolidation_strength,
1756            request.max_consolidation_strength,
1757        );
1758
1759        // Add recall probability range filter
1760        builder.add_recall_probability_range(
1761            request.min_recall_probability,
1762            request.max_recall_probability,
1763        );
1764
1765        // Add tier filter if specified
1766        if let Some(tier) = &request.tier {
1767            builder.add_tier_filter(tier);
1768        }
1769
1770        // Exclude frozen tier if not included
1771        builder.add_exclude_frozen(!request.include_frozen.unwrap_or(false));
1772
1773        // Add ordering and pagination
1774        builder.add_condition(
1775            "ORDER BY consolidation_strength DESC, recall_probability DESC NULLS LAST",
1776        );
1777        builder.add_pagination(limit as usize, offset as usize)?;
1778
1779        // Build query and execute with parameterized binding
1780        let query = builder.build_query();
1781        let sqlx_query = builder.bind_parameters_as(sqlx::query_as::<_, Memory>(&query));
1782
1783        let memories = sqlx_query.fetch_all(&self.pool).await?;
1784        Ok(memories)
1785    }
1786
1787    /// Update three-component scores for specific memory
1788    pub async fn update_memory_scores(
1789        &self,
1790        memory_id: Uuid,
1791        recency_score: f64,
1792        relevance_score: f64,
1793    ) -> Result<()> {
1794        sqlx::query(
1795            r#"
1796            UPDATE memories 
1797            SET recency_score = $2, 
1798                relevance_score = $3,
1799                updated_at = NOW()
1800            WHERE id = $1 AND status = 'active'
1801            "#,
1802        )
1803        .bind(memory_id)
1804        .bind(recency_score)
1805        .bind(relevance_score)
1806        .execute(&self.pool)
1807        .await?;
1808
1809        Ok(())
1810    }
1811
1812    /// Batch update three-component scores for all active memories
1813    pub async fn batch_update_three_component_scores(&self) -> Result<i64> {
1814        let start_time = Instant::now();
1815
1816        let result = sqlx::query(
1817            r#"
1818            UPDATE memories 
1819            SET recency_score = calculate_recency_score(last_accessed_at, created_at, 0.005),
1820                relevance_score = LEAST(1.0, 
1821                    0.5 * importance_score + 
1822                    0.3 * LEAST(1.0, access_count / 100.0) + 
1823                    0.2
1824                ),
1825                updated_at = NOW()
1826            WHERE status = 'active'
1827            "#,
1828        )
1829        .execute(&self.pool)
1830        .await?;
1831
1832        let duration = start_time.elapsed();
1833        info!(
1834            "Updated three-component scores for {} memories in {:?}",
1835            result.rows_affected(),
1836            duration
1837        );
1838
1839        Ok(result.rows_affected() as i64)
1840    }
1841
1842    /// Get memories ranked by three-component combined score using generated column
1843    pub async fn get_memories_by_combined_score(
1844        &self,
1845        tier: Option<MemoryTier>,
1846        limit: Option<i32>,
1847        recency_weight: Option<f64>,
1848        importance_weight: Option<f64>,
1849        relevance_weight: Option<f64>,
1850    ) -> Result<Vec<Memory>> {
1851        let limit = limit.unwrap_or(50);
1852
1853        // Note: Custom weights are not supported with the generated column approach
1854        // The generated column uses fixed weights: 0.333, 0.333, 0.334
1855        // This is a trade-off for P99 <1ms performance
1856        if recency_weight.is_some() || importance_weight.is_some() || relevance_weight.is_some() {
1857            warn!(
1858                "Custom weights not supported with generated combined_score column. Using fixed weights: 0.333, 0.333, 0.334"
1859            );
1860        }
1861
1862        let query = if let Some(tier) = tier {
1863            sqlx::query_as::<_, Memory>(
1864                r#"
1865                SELECT m.*
1866                FROM memories m
1867                WHERE m.status = 'active'
1868                  AND m.tier = $1
1869                ORDER BY m.combined_score DESC, m.updated_at DESC
1870                LIMIT $2
1871                "#,
1872            )
1873            .bind(tier)
1874            .bind(limit as i64)
1875        } else {
1876            sqlx::query_as::<_, Memory>(
1877                r#"
1878                SELECT m.*
1879                FROM memories m
1880                WHERE m.status = 'active'
1881                ORDER BY m.combined_score DESC, m.updated_at DESC
1882                LIMIT $1
1883                "#,
1884            )
1885            .bind(limit as i64)
1886        };
1887
1888        let memories = query.fetch_all(&self.pool).await?;
1889
1890        debug!(
1891            "Retrieved {} memories ranked by generated combined_score for tier {:?}",
1892            memories.len(),
1893            tier
1894        );
1895
1896        Ok(memories)
1897    }
1898
1899    // Simple Consolidation Integration Methods
1900
1901    /// Get memories for consolidation processing with batch optimization (SQL injection safe)
1902    pub async fn get_memories_for_consolidation(
1903        &self,
1904        tier: Option<MemoryTier>,
1905        batch_size: usize,
1906        min_hours_since_last_processing: f64,
1907    ) -> Result<Vec<Memory>> {
1908        // Use safe query builder to prevent SQL injection
1909        let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
1910
1911        // Add time interval condition safely
1912        builder.add_last_access_interval(min_hours_since_last_processing);
1913
1914        // Add tier filter if specified
1915        if let Some(tier) = tier {
1916            builder.add_tier_filter(&tier);
1917        }
1918
1919        // Add complex ordering with recall probability condition
1920        let threshold_bind_index = builder.bind_index;
1921        builder.add_recall_threshold_condition(constants::COLD_MIGRATION_THRESHOLD);
1922
1923        let order_condition = format!(
1924            "ORDER BY CASE WHEN recall_probability IS NULL THEN 1 WHEN recall_probability < ${threshold_bind_index} THEN 2 ELSE 3 END, last_accessed_at ASC NULLS FIRST, consolidation_strength ASC"
1925        );
1926        builder.add_condition(&order_condition);
1927
1928        // Add pagination (limit only, no offset for this use case)
1929        builder.add_pagination(batch_size, 0)?;
1930
1931        // Build query and execute with parameterized binding
1932        let query = builder.build_query();
1933        let sqlx_query = builder.bind_parameters_as(sqlx::query_as::<_, Memory>(&query));
1934
1935        let memories = sqlx_query.fetch_all(&self.pool).await?;
1936        Ok(memories)
1937    }
1938
1939    /// Batch update consolidation values for multiple memories
1940    pub async fn batch_update_consolidation(
1941        &self,
1942        updates: &[(Uuid, f64, f64)], // (id, new_strength, recall_probability)
1943    ) -> Result<usize> {
1944        if updates.is_empty() {
1945            return Ok(0);
1946        }
1947
1948        let mut tx = self.pool.begin().await?;
1949        let mut updated_count = 0;
1950
1951        for (memory_id, new_strength, recall_prob) in updates {
1952            let result = sqlx::query(
1953                r#"
1954                UPDATE memories 
1955                SET consolidation_strength = $1, 
1956                    recall_probability = $2,
1957                    updated_at = NOW()
1958                WHERE id = $3 AND status = 'active'
1959                "#,
1960            )
1961            .bind(new_strength)
1962            .bind(recall_prob)
1963            .bind(memory_id)
1964            .execute(&mut *tx)
1965            .await?;
1966
1967            updated_count += result.rows_affected() as usize;
1968        }
1969
1970        tx.commit().await?;
1971        Ok(updated_count)
1972    }
1973
1974    /// Batch migrate memories to new tiers
1975    pub async fn batch_migrate_memories(
1976        &self,
1977        migrations: &[(Uuid, MemoryTier)], // (memory_id, target_tier)
1978    ) -> Result<usize> {
1979        if migrations.is_empty() {
1980            return Ok(0);
1981        }
1982
1983        let mut tx = self.pool.begin().await?;
1984        let mut migrated_count = 0;
1985
1986        for (memory_id, target_tier) in migrations {
1987            // Get current tier for migration logging
1988            let current_memory: Option<(MemoryTier,)> =
1989                sqlx::query_as("SELECT tier FROM memories WHERE id = $1 AND status = 'active'")
1990                    .bind(memory_id)
1991                    .fetch_optional(&mut *tx)
1992                    .await?;
1993
1994            if let Some((current_tier,)) = current_memory {
1995                // Update the tier
1996                let result = sqlx::query(
1997                    r#"
1998                    UPDATE memories 
1999                    SET tier = $1, updated_at = NOW()
2000                    WHERE id = $2 AND status = 'active'
2001                    "#,
2002                )
2003                .bind(target_tier)
2004                .bind(memory_id)
2005                .execute(&mut *tx)
2006                .await?;
2007
2008                if result.rows_affected() > 0 {
2009                    migrated_count += 1;
2010
2011                    // Log the migration
2012                    self.record_migration(
2013                        &mut tx,
2014                        *memory_id,
2015                        current_tier,
2016                        *target_tier,
2017                        Some("Simple consolidation automatic migration".to_string()),
2018                    )
2019                    .await?;
2020                }
2021            }
2022        }
2023
2024        tx.commit().await?;
2025        Ok(migrated_count)
2026    }
2027
2028    /// Get migration candidates using simple consolidation formula (SQL injection safe)
2029    pub async fn get_simple_consolidation_candidates(
2030        &self,
2031        tier: Option<MemoryTier>,
2032        threshold: f64,
2033        limit: usize,
2034    ) -> Result<Vec<Memory>> {
2035        // Use safe query builder to prevent SQL injection
2036        let mut builder = SafeQueryBuilder::new(
2037            "SELECT * FROM memories WHERE status = 'active' AND (recall_probability < $1 OR recall_probability IS NULL)"
2038        );
2039
2040        // Add tier filter if specified
2041        if let Some(tier) = tier {
2042            builder.add_tier_filter(&tier);
2043        }
2044
2045        // Add ordering
2046        builder.add_condition("ORDER BY COALESCE(recall_probability, 0) ASC, consolidation_strength ASC, last_accessed_at ASC NULLS FIRST");
2047
2048        // Add pagination (limit only, no offset for this use case)
2049        builder.add_pagination(limit, 0)?;
2050
2051        // Build query and execute with parameterized binding
2052        let query = builder.build_query();
2053        let mut sqlx_query = sqlx::query_as::<_, Memory>(&query).bind(threshold);
2054        sqlx_query = builder.bind_parameters_as(sqlx_query);
2055
2056        let memories = sqlx_query.fetch_all(&self.pool).await?;
2057        Ok(memories)
2058    }
2059
2060    /// Log simple consolidation event with performance metrics
2061    pub async fn log_simple_consolidation_event(
2062        &self,
2063        memory_id: Uuid,
2064        previous_strength: f64,
2065        new_strength: f64,
2066        previous_probability: Option<f64>,
2067        new_probability: f64,
2068        processing_time_ms: u64,
2069    ) -> Result<()> {
2070        let context = serde_json::json!({
2071            "engine": "simple_consolidation",
2072            "processing_time_ms": processing_time_ms,
2073            "strength_delta": new_strength - previous_strength,
2074            "probability_delta": new_probability - previous_probability.unwrap_or(0.0)
2075        });
2076
2077        self.log_consolidation_event(
2078            memory_id,
2079            "simple_consolidation",
2080            previous_strength,
2081            new_strength,
2082            previous_probability,
2083            Some(new_probability),
2084            None, // Simple consolidation doesn't track recall intervals
2085            context,
2086        )
2087        .await
2088    }
2089
2090    /// Get simple consolidation statistics
2091    pub async fn get_simple_consolidation_stats(&self) -> Result<SimpleConsolidationStats> {
2092        let stats = sqlx::query_as::<_, SimpleConsolidationStats>(
2093            r#"
2094            SELECT 
2095                COUNT(*) FILTER (WHERE recall_probability < $1) as migration_candidates,
2096                COUNT(*) FILTER (WHERE consolidation_strength > 5.0) as highly_consolidated,
2097                AVG(consolidation_strength) as avg_consolidation_strength,
2098                AVG(recall_probability) as avg_recall_probability,
2099                COUNT(*) FILTER (WHERE last_accessed_at > NOW() - INTERVAL '24 hours') as recently_accessed,
2100                COUNT(*) as total_active_memories
2101            FROM memories 
2102            WHERE status = 'active'
2103            "#,
2104        )
2105        .bind(constants::COLD_MIGRATION_THRESHOLD)
2106        .fetch_one(&self.pool)
2107        .await?;
2108
2109        Ok(stats)
2110    }
2111
2112    /// Get trigger metrics if trigger engine is available
2113    pub async fn get_trigger_metrics(&self) -> Option<super::event_triggers::TriggerMetrics> {
2114        if let Some(trigger_engine) = &self.trigger_engine {
2115            Some(trigger_engine.get_metrics().await)
2116        } else {
2117            None
2118        }
2119    }
2120
2121    /// Reset trigger metrics if trigger engine is available
2122    pub async fn reset_trigger_metrics(&self) -> Result<()> {
2123        if let Some(trigger_engine) = &self.trigger_engine {
2124            trigger_engine.reset_metrics().await?;
2125        }
2126        Ok(())
2127    }
2128
2129    /// Add user-specific trigger customization
2130    pub async fn add_user_trigger_customization(
2131        &self,
2132        user_id: String,
2133        customizations: std::collections::HashMap<
2134            super::event_triggers::TriggerEvent,
2135            super::event_triggers::TriggerPattern,
2136        >,
2137    ) -> Result<()> {
2138        if let Some(trigger_engine) = &self.trigger_engine {
2139            trigger_engine
2140                .add_user_customization(user_id, customizations)
2141                .await?;
2142        }
2143        Ok(())
2144    }
2145
2146    /// Check if trigger engine is enabled
2147    pub fn has_trigger_engine(&self) -> bool {
2148        self.trigger_engine.is_some()
2149    }
2150
2151    /// Batch freeze memories that meet migration criteria (P(recall) < 0.2)
2152    pub async fn batch_freeze_by_recall_probability(
2153        &self,
2154        max_batch_size: Option<usize>,
2155    ) -> Result<BatchFreezeResult> {
2156        use std::time::Instant;
2157
2158        let start_time = Instant::now();
2159        let batch_size = max_batch_size.unwrap_or(100_000); // Default to 100K as per requirements
2160
2161        // Find memories in Cold tier with P(recall) < 0.2
2162        let candidates = sqlx::query_as::<_, Memory>(
2163            r#"
2164            SELECT * FROM memories 
2165            WHERE tier = 'cold' 
2166            AND status = 'active'
2167            AND COALESCE(recall_probability, 0) < 0.2
2168            ORDER BY recall_probability ASC, last_accessed_at ASC
2169            LIMIT $1
2170            "#,
2171        )
2172        .bind(batch_size as i64)
2173        .fetch_all(&self.pool)
2174        .await?;
2175
2176        let mut frozen_ids = Vec::new();
2177        let mut total_space_saved = 0u64;
2178        let mut compression_ratios = Vec::new();
2179
2180        info!("Starting batch freeze of {} memories", candidates.len());
2181
2182        // Process in smaller chunks to avoid transaction timeouts
2183        for chunk in candidates.chunks(1000) {
2184            let mut tx = self.pool.begin().await?;
2185
2186            for memory in chunk {
2187                // Call freeze function for each memory
2188                match sqlx::query("SELECT freeze_memory($1) as frozen_id")
2189                    .bind(memory.id)
2190                    .fetch_one(&mut *tx)
2191                    .await
2192                {
2193                    Ok(row) => {
2194                        let frozen_id: Uuid = row.get("frozen_id");
2195                        frozen_ids.push(frozen_id);
2196
2197                        // Estimate space saved (original content vs compressed)
2198                        let original_size = memory.content.len() as u64;
2199                        let estimated_compressed_size = original_size / 6; // Assume ~6:1 compression
2200                        total_space_saved += original_size - estimated_compressed_size;
2201                        compression_ratios.push(6.0);
2202                    }
2203                    Err(e) => {
2204                        warn!("Failed to freeze memory {}: {}", memory.id, e);
2205                        continue;
2206                    }
2207                }
2208            }
2209
2210            tx.commit().await?;
2211        }
2212
2213        let processing_time = start_time.elapsed();
2214        let avg_compression_ratio = if !compression_ratios.is_empty() {
2215            compression_ratios.iter().sum::<f32>() / compression_ratios.len() as f32
2216        } else {
2217            0.0
2218        };
2219
2220        info!(
2221            "Batch freeze completed: {} memories frozen in {:?}, avg compression: {:.1}:1",
2222            frozen_ids.len(),
2223            processing_time,
2224            avg_compression_ratio
2225        );
2226
2227        Ok(BatchFreezeResult {
2228            memories_frozen: frozen_ids.len() as u32,
2229            total_space_saved_bytes: total_space_saved,
2230            average_compression_ratio: avg_compression_ratio,
2231            processing_time_ms: processing_time.as_millis() as u64,
2232            frozen_memory_ids: frozen_ids,
2233        })
2234    }
2235
2236    /// Batch unfreeze memories
2237    pub async fn batch_unfreeze_memories(
2238        &self,
2239        frozen_ids: Vec<Uuid>,
2240        target_tier: Option<MemoryTier>,
2241    ) -> Result<BatchUnfreezeResult> {
2242        use std::time::Instant;
2243
2244        let start_time = Instant::now();
2245        let mut unfrozen_memory_ids = Vec::new();
2246        let mut total_delay_seconds = 0i32;
2247
2248        info!("Starting batch unfreeze of {} memories", frozen_ids.len());
2249
2250        // Process in smaller chunks to manage delays and transactions
2251        for chunk in frozen_ids.chunks(100) {
2252            for frozen_id in chunk {
2253                match self.unfreeze_memory(*frozen_id, target_tier).await {
2254                    Ok(response) => {
2255                        unfrozen_memory_ids.push(response.memory_id);
2256                        total_delay_seconds += response.retrieval_delay_seconds;
2257                    }
2258                    Err(e) => {
2259                        warn!("Failed to unfreeze memory {}: {}", frozen_id, e);
2260                        continue;
2261                    }
2262                }
2263            }
2264        }
2265
2266        let processing_time = start_time.elapsed();
2267        let avg_delay_seconds = if !unfrozen_memory_ids.is_empty() {
2268            total_delay_seconds as f32 / unfrozen_memory_ids.len() as f32
2269        } else {
2270            0.0
2271        };
2272
2273        info!(
2274            "Batch unfreeze completed: {} memories unfrozen in {:?}, avg delay: {:.1}s",
2275            unfrozen_memory_ids.len(),
2276            processing_time,
2277            avg_delay_seconds
2278        );
2279
2280        Ok(BatchUnfreezeResult {
2281            memories_unfrozen: unfrozen_memory_ids.len() as u32,
2282            total_processing_time_ms: processing_time.as_millis() as u64,
2283            average_delay_seconds: avg_delay_seconds,
2284            unfrozen_memory_ids,
2285        })
2286    }
2287
2288    // ==========================================
2289    // Harvest Session Management Methods
2290    // ==========================================
2291
2292    /*
2293    /// Create a new harvest session
2294    pub async fn create_harvest_session(
2295        &self,
2296        request: CreateHarvestSessionRequest,
2297    ) -> Result<HarvestSession> {
2298        let session_id = Uuid::new_v4();
2299        let now = Utc::now();
2300
2301        let config_snapshot = request
2302            .config_snapshot
2303            .unwrap_or_else(|| serde_json::json!({}));
2304
2305        let session = sqlx::query_as!(
2306            HarvestSession,
2307            r#"
2308            INSERT INTO harvest_sessions (
2309                id, session_type, trigger_reason, started_at, status,
2310                messages_processed, patterns_extracted, patterns_stored,
2311                duplicates_filtered, processing_time_ms, config_snapshot,
2312                error_message, retry_count, extraction_time_ms,
2313                deduplication_time_ms, storage_time_ms, created_at
2314            ) VALUES (
2315                $1, $2, $3, $4, $5, 0, 0, 0, 0, 0, $6, NULL, 0, 0, 0, 0, $7
2316            )
2317            RETURNING id, session_type as "session_type: HarvestSessionType",
2318                     trigger_reason, started_at, completed_at,
2319                     status as "status: HarvestSessionStatus",
2320                     messages_processed, patterns_extracted, patterns_stored,
2321                     duplicates_filtered, processing_time_ms, config_snapshot,
2322                     error_message, retry_count, extraction_time_ms,
2323                     deduplication_time_ms, storage_time_ms,
2324                     memory_usage_mb, cpu_usage_percent, created_at
2325            "#,
2326            session_id,
2327            request.session_type as HarvestSessionType,
2328            request.trigger_reason,
2329            now,
2330            HarvestSessionStatus::InProgress as HarvestSessionStatus,
2331            config_snapshot,
2332            now
2333        )
2334        .fetch_one(&self.pool)
2335        .await
2336        .map_err(|e| MemoryError::DatabaseError {
2337            message: format!("Failed to create harvest session: {}", e),
2338        })?;
2339
2340        Ok(session)
2341    }
2342    */
2343
2344    /*
2345    /// Update an existing harvest session
2346    pub async fn update_harvest_session(
2347        &self,
2348        session_id: Uuid,
2349        request: UpdateHarvestSessionRequest,
2350    ) -> Result<HarvestSession> {
2351        let mut tx = self
2352            .pool
2353            .begin()
2354            .await
2355            .map_err(|e| MemoryError::DatabaseError {
2356                message: format!("Failed to begin transaction: {}", e),
2357            })?;
2358
2359        // Build dynamic update query
2360        let mut set_clauses = Vec::new();
2361        let mut param_index = 2; // $1 is reserved for session_id
2362
2363        if request.status.is_some() {
2364            set_clauses.push(format!("status = ${}", param_index));
2365            param_index += 1;
2366        }
2367        if request.messages_processed.is_some() {
2368            set_clauses.push(format!("messages_processed = ${}", param_index));
2369            param_index += 1;
2370        }
2371        if request.patterns_extracted.is_some() {
2372            set_clauses.push(format!("patterns_extracted = ${}", param_index));
2373            param_index += 1;
2374        }
2375        if request.patterns_stored.is_some() {
2376            set_clauses.push(format!("patterns_stored = ${}", param_index));
2377            param_index += 1;
2378        }
2379        if request.duplicates_filtered.is_some() {
2380            set_clauses.push(format!("duplicates_filtered = ${}", param_index));
2381            param_index += 1;
2382        }
2383        if request.processing_time_ms.is_some() {
2384            set_clauses.push(format!("processing_time_ms = ${}", param_index));
2385            param_index += 1;
2386        }
2387        if request.error_message.is_some() {
2388            set_clauses.push(format!("error_message = ${}", param_index));
2389            param_index += 1;
2390        }
2391
2392        if set_clauses.is_empty() {
2393            return self.get_harvest_session(session_id).await;
2394        }
2395
2396        let query = format!(
2397            r#"
2398            UPDATE harvest_sessions
2399            SET {}
2400            WHERE id = $1
2401            RETURNING id, session_type as "session_type: HarvestSessionType",
2402                     trigger_reason, started_at, completed_at,
2403                     status as "status: HarvestSessionStatus",
2404                     messages_processed, patterns_extracted, patterns_stored,
2405                     duplicates_filtered, processing_time_ms, config_snapshot,
2406                     error_message, retry_count, extraction_time_ms,
2407                     deduplication_time_ms, storage_time_ms,
2408                     memory_usage_mb, cpu_usage_percent, created_at
2409            "#,
2410            set_clauses.join(", ")
2411        );
2412
2413        let mut query_builder = sqlx::query_as::<_, HarvestSession>(&query);
2414        query_builder = query_builder.bind(session_id);
2415
2416        if let Some(status) = request.status {
2417            query_builder = query_builder.bind(status as HarvestSessionStatus);
2418        }
2419        if let Some(messages_processed) = request.messages_processed {
2420            query_builder = query_builder.bind(messages_processed);
2421        }
2422        if let Some(patterns_extracted) = request.patterns_extracted {
2423            query_builder = query_builder.bind(patterns_extracted);
2424        }
2425        if let Some(patterns_stored) = request.patterns_stored {
2426            query_builder = query_builder.bind(patterns_stored);
2427        }
2428        if let Some(duplicates_filtered) = request.duplicates_filtered {
2429            query_builder = query_builder.bind(duplicates_filtered);
2430        }
2431        if let Some(processing_time_ms) = request.processing_time_ms {
2432            query_builder = query_builder.bind(processing_time_ms);
2433        }
2434        if let Some(error_message) = request.error_message {
2435            query_builder = query_builder.bind(error_message);
2436        }
2437
2438        let session =
2439            query_builder
2440                .fetch_one(&mut *tx)
2441                .await
2442                .map_err(|e| MemoryError::DatabaseError {
2443                    message: format!("Failed to update harvest session: {}", e),
2444                })?;
2445
2446        tx.commit().await.map_err(|e| MemoryError::DatabaseError {
2447            message: format!("Failed to commit harvest session update: {}", e),
2448        })?;
2449
2450        Ok(session)
2451    }
2452    */
2453
2454    /*
2455    /// Get a harvest session by ID
2456    pub async fn get_harvest_session(&self, session_id: Uuid) -> Result<HarvestSession> {
2457        let session = sqlx::query_as!(
2458            HarvestSession,
2459            r#"
2460            SELECT id, session_type as "session_type: HarvestSessionType",
2461                   trigger_reason, started_at, completed_at,
2462                   status as "status: HarvestSessionStatus",
2463                   messages_processed, patterns_extracted, patterns_stored,
2464                   duplicates_filtered, processing_time_ms, config_snapshot,
2465                   error_message, retry_count, extraction_time_ms,
2466                   deduplication_time_ms, storage_time_ms,
2467                   memory_usage_mb, cpu_usage_percent, created_at
2468            FROM harvest_sessions
2469            WHERE id = $1
2470            "#,
2471            session_id
2472        )
2473        .fetch_one(&self.pool)
2474        .await
2475        .map_err(|e| MemoryError::DatabaseError {
2476            message: format!("Harvest session not found: {}", e),
2477        })?;
2478
2479        Ok(session)
2480    }
2481    */
2482
2483    /*
2484    /// Get harvest success rate statistics
2485    pub async fn get_harvest_success_rate(&self, days_back: i32) -> Result<HarvestSuccessRate> {
2486        let stats = sqlx::query_as!(
2487            HarvestSuccessRate,
2488            r#"
2489            SELECT
2490                COUNT(*)::INTEGER as total_sessions,
2491                COUNT(*) FILTER (WHERE status = 'completed')::INTEGER as successful_sessions,
2492                COUNT(*) FILTER (WHERE status = 'failed')::INTEGER as failed_sessions,
2493                (COUNT(*) FILTER (WHERE status = 'completed')::FLOAT / GREATEST(COUNT(*), 1)::FLOAT) as success_rate,
2494                COALESCE(AVG(processing_time_ms), 0)::FLOAT as average_processing_time_ms
2495            FROM harvest_sessions
2496            WHERE started_at > NOW() - ($1 || ' days')::INTERVAL
2497            "#,
2498            days_back
2499        )
2500        .fetch_one(&self.pool)
2501        .await
2502        .map_err(|e| MemoryError::DatabaseError {
2503            message: format!("Failed to get harvest success rate: {}", e),
2504        })?;
2505
2506        Ok(stats)
2507    }
2508    */
2509
2510    // ==========================================
2511    // Harvest Pattern Management Methods
2512    // ==========================================
2513
2514    /*
2515    /// Create a new harvest pattern
2516    pub async fn create_harvest_pattern(
2517        &self,
2518        request: CreateHarvestPatternRequest,
2519    ) -> Result<HarvestPattern> {
2520        let pattern_id = Uuid::new_v4();
2521        let now = Utc::now();
2522        let metadata = request.metadata.unwrap_or_else(|| serde_json::json!({}));
2523
2524        let pattern = sqlx::query_as!(
2525            HarvestPattern,
2526            r#"
2527            INSERT INTO harvest_patterns (
2528                id, harvest_session_id, pattern_type, content, confidence_score,
2529                source_message_id, context, metadata, status, memory_id,
2530                rejection_reason, extraction_confidence, similarity_to_existing,
2531                extracted_at
2532            ) VALUES (
2533                $1, $2, $3, $4, $5, $6, $7, $8, 'extracted', NULL, NULL, NULL, NULL, $9
2534            )
2535            RETURNING id, harvest_session_id,
2536                     pattern_type as "pattern_type: HarvestPatternType",
2537                     content, confidence_score, source_message_id, context,
2538                     metadata, status as "status: HarvestPatternStatus",
2539                     memory_id, rejection_reason, extraction_confidence,
2540                     similarity_to_existing, extracted_at
2541            "#,
2542            pattern_id,
2543            request.harvest_session_id,
2544            request.pattern_type as HarvestPatternType,
2545            request.content,
2546            request.confidence_score,
2547            request.source_message_id,
2548            request.context,
2549            metadata,
2550            now
2551        )
2552        .fetch_one(&self.pool)
2553        .await
2554        .map_err(|e| MemoryError::DatabaseError {
2555            message: format!("Failed to create harvest pattern: {}", e),
2556        })?;
2557
2558        Ok(pattern)
2559    }
2560    */
2561
2562    /*
2563    /// Get top performing harvest patterns
2564    pub async fn get_top_harvest_patterns(
2565        &self,
2566        limit: i32,
2567        days_back: i32,
2568    ) -> Result<Vec<TopHarvestPattern>> {
2569        let patterns = sqlx::query_as!(
2570            TopHarvestPattern,
2571            r#"
2572            SELECT
2573                pattern_type as "pattern_type: HarvestPatternType",
2574                COUNT(*)::INTEGER as total_extracted,
2575                COUNT(*) FILTER (WHERE status = 'stored')::INTEGER as total_stored,
2576                AVG(confidence_score)::FLOAT as avg_confidence,
2577                (COUNT(*) FILTER (WHERE status = 'stored')::FLOAT / COUNT(*)::FLOAT) as success_rate
2578            FROM harvest_patterns
2579            WHERE extracted_at > NOW() - ($2 || ' days')::INTERVAL
2580            GROUP BY pattern_type
2581            ORDER BY success_rate DESC, total_stored DESC
2582            LIMIT $1
2583            "#,
2584            limit,
2585            days_back
2586        )
2587        .fetch_all(&self.pool)
2588        .await
2589        .map_err(|e| MemoryError::DatabaseError {
2590            message: format!("Failed to get top harvest patterns: {}", e),
2591        })?;
2592
2593        Ok(patterns)
2594    }
2595    */
2596
2597    // ==========================================
2598    // Consolidation Event Management Methods
2599    // ==========================================
2600
2601    /* TEMPORARILY COMMENTED OUT - missing consolidation_events table
2602    /// Create a consolidation event
2603    pub async fn create_consolidation_event(
2604        &self,
2605        request: CreateConsolidationEventRequest,
2606    ) -> Result<ConsolidationEvent> {
2607        let event_id = Uuid::new_v4();
2608        let now = Utc::now();
2609        let context_metadata = request
2610            .context_metadata
2611            .unwrap_or_else(|| serde_json::json!({}));
2612
2613        // Calculate deltas if both old and new values are provided
2614        let strength_delta = match (
2615            request.old_consolidation_strength,
2616            request.new_consolidation_strength,
2617        ) {
2618            (Some(old), Some(new)) => Some(new - old),
2619            _ => None,
2620        };
2621
2622        let probability_delta = match (
2623            request.old_recall_probability,
2624            request.new_recall_probability,
2625        ) {
2626            (Some(old), Some(new)) => Some(new - old),
2627            _ => None,
2628        };
2629
2630        let event = sqlx::query_as!(
2631            ConsolidationEvent,
2632            r#"
2633            INSERT INTO consolidation_events (
2634                id, event_type, memory_id, source_tier, target_tier,
2635                migration_reason, old_consolidation_strength, new_consolidation_strength,
2636                strength_delta, old_recall_probability, new_recall_probability,
2637                probability_delta, processing_time_ms, triggered_by,
2638                context_metadata, created_at
2639            ) VALUES (
2640                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, NULL, $13, $14, $15
2641            )
2642            RETURNING id, event_type as "event_type: ConsolidationEventType",
2643                     memory_id, source_tier, target_tier, migration_reason,
2644                     old_consolidation_strength, new_consolidation_strength,
2645                     strength_delta, old_recall_probability, new_recall_probability,
2646                     probability_delta, processing_time_ms, triggered_by,
2647                     context_metadata, created_at
2648            "#,
2649            event_id,
2650            request.event_type as ConsolidationEventType,
2651            request.memory_id,
2652            request.source_tier,
2653            request.target_tier,
2654            request.migration_reason,
2655            request.old_consolidation_strength,
2656            request.new_consolidation_strength,
2657            strength_delta,
2658            request.old_recall_probability,
2659            request.new_recall_probability,
2660            probability_delta,
2661            request.triggered_by,
2662            context_metadata,
2663            now
2664        )
2665        .fetch_one(&self.pool)
2666        .await
2667        .map_err(|e| MemoryError::DatabaseError {
2668            message: format!("Failed to create consolidation event: {}", e),
2669        })?;
2670
2671        Ok(event)
2672    }
2673    */
2674
2675    /* TEMPORARILY COMMENTED OUT - missing consolidation_events table
2676    /// Get tier migration statistics
2677    pub async fn get_tier_migration_stats(
2678        &self,
2679        days_back: i32,
2680    ) -> Result<Vec<TierMigrationStats>> {
2681        let stats = sqlx::query_as!(
2682            TierMigrationStats,
2683            r#"
2684            SELECT
2685                COALESCE(ce.source_tier, 'unknown') as source_tier,
2686                COALESCE(ce.target_tier, 'unknown') as target_tier,
2687                COUNT(*)::INTEGER as migration_count,
2688                COALESCE(AVG(ce.processing_time_ms), 0)::FLOAT as avg_processing_time_ms,
2689                -- Calculate success rate by checking if memory actually moved to target tier
2690                (COUNT(*) FILTER (WHERE m.tier::text = ce.target_tier)::FLOAT / COUNT(*)::FLOAT) as success_rate
2691            FROM consolidation_events ce
2692            JOIN memories m ON ce.memory_id = m.id
2693            WHERE ce.event_type = 'tier_migration'
2694            AND ce.created_at > NOW() - ($1 || ' days')::INTERVAL
2695            GROUP BY ce.source_tier, ce.target_tier
2696            ORDER BY migration_count DESC
2697            "#,
2698            days_back
2699        )
2700        .fetch_all(&self.pool)
2701        .await
2702        .map_err(|e| MemoryError::DatabaseError {
2703            message: format!("Failed to get tier migration stats: {}", e),
2704        })?;
2705
2706        Ok(stats)
2707    }
2708    */
2709
2710    // ==========================================
2711    // Memory Access Log Management Methods
2712    // ==========================================
2713
2714    /* TEMPORARILY COMMENTED OUT - missing memory_access_log table
2715    /// Create a memory access log entry
2716    pub async fn create_memory_access_log(
2717        &self,
2718        request: CreateMemoryAccessLogRequest,
2719    ) -> Result<MemoryAccessLog> {
2720        let log_id = Uuid::new_v4();
2721        let now = Utc::now();
2722
2723        let log_entry = sqlx::query_as!(
2724            MemoryAccessLog,
2725            r#"
2726            INSERT INTO memory_access_log (
2727                id, memory_id, access_type, session_id, user_context,
2728                query_context, retrieval_time_ms, similarity_score,
2729                ranking_position, importance_boost, access_count_increment,
2730                accessed_at
2731            ) VALUES (
2732                $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12
2733            )
2734            RETURNING id, memory_id, access_type as "access_type: MemoryAccessType",
2735                     session_id, user_context, query_context, retrieval_time_ms,
2736                     similarity_score, ranking_position, importance_boost,
2737                     access_count_increment, accessed_at
2738            "#,
2739            log_id,
2740            request.memory_id,
2741            request.access_type as MemoryAccessType,
2742            request.session_id,
2743            request.user_context,
2744            request.query_context,
2745            request.retrieval_time_ms,
2746            request.similarity_score,
2747            request.ranking_position,
2748            request.importance_boost.unwrap_or(0.0),
2749            request.access_count_increment.unwrap_or(1),
2750            now
2751        )
2752        .fetch_one(&self.pool)
2753        .await
2754        .map_err(|e| MemoryError::DatabaseError {
2755            message: format!("Failed to create memory access log: {}", e),
2756        })?;
2757
2758        Ok(log_entry)
2759    }
2760    */
2761
2762    // ==========================================
2763    // System Metrics Management Methods
2764    // ==========================================
2765
2766    /* TEMPORARILY COMMENTED OUT - missing system_metrics_snapshots table
2767    /// Create a system metrics snapshot
2768    pub async fn create_system_metrics_snapshot(
2769        &self,
2770        snapshot_type: SystemMetricsSnapshotType,
2771    ) -> Result<SystemMetricsSnapshot> {
2772        let snapshot_id = Uuid::new_v4();
2773        let now = Utc::now();
2774
2775        // Get current memory tier statistics
2776        let tier_stats = sqlx::query!(
2777            r#"
2778            SELECT
2779                COUNT(*) FILTER (WHERE tier = 'working' AND status = 'active') as working_count,
2780                COUNT(*) FILTER (WHERE tier = 'warm' AND status = 'active') as warm_count,
2781                COUNT(*) FILTER (WHERE tier = 'cold' AND status = 'active') as cold_count,
2782                COUNT(*) FILTER (WHERE tier = 'frozen') as frozen_count,
2783                SUM(LENGTH(content::text)) as total_storage_bytes
2784            FROM memories
2785            "#
2786        )
2787        .fetch_one(&self.pool)
2788        .await
2789        .map_err(|e| MemoryError::DatabaseError {
2790            message: format!("Failed to get memory tier statistics: {}", e),
2791        })?;
2792
2793        let snapshot = sqlx::query_as!(
2794            SystemMetricsSnapshot,
2795            r#"
2796            INSERT INTO system_metrics_snapshots (
2797                id, snapshot_type, working_memory_count, warm_memory_count,
2798                cold_memory_count, frozen_memory_count, total_storage_bytes,
2799                compressed_storage_bytes, average_compression_ratio,
2800                average_query_time_ms, p95_query_time_ms, p99_query_time_ms,
2801                slow_query_count, consolidation_backlog, migration_queue_size,
2802                failed_operations_count, vector_index_size_mb,
2803                vector_search_performance, database_cpu_percent,
2804                database_memory_mb, connection_count, active_connections,
2805                recorded_at
2806            ) VALUES (
2807                $1, $2, $3, $4, $5, $6, $7, 0, NULL, NULL, NULL, NULL, 0, 0, 0, 0,
2808                NULL, '{}', NULL, NULL, NULL, NULL, $8
2809            )
2810            RETURNING id, snapshot_type as "snapshot_type: SystemMetricsSnapshotType",
2811                     working_memory_count, warm_memory_count, cold_memory_count,
2812                     frozen_memory_count, total_storage_bytes, compressed_storage_bytes,
2813                     average_compression_ratio, average_query_time_ms, p95_query_time_ms,
2814                     p99_query_time_ms, slow_query_count, consolidation_backlog,
2815                     migration_queue_size, failed_operations_count, vector_index_size_mb,
2816                     vector_search_performance, database_cpu_percent, database_memory_mb,
2817                     connection_count, active_connections, recorded_at
2818            "#,
2819            snapshot_id,
2820            snapshot_type as SystemMetricsSnapshotType,
2821            tier_stats.working_count.unwrap_or(0) as i32,
2822            tier_stats.warm_count.unwrap_or(0) as i32,
2823            tier_stats.cold_count.unwrap_or(0) as i32,
2824            tier_stats.frozen_count.unwrap_or(0) as i32,
2825            tier_stats.total_storage_bytes.unwrap_or(0),
2826            now
2827        )
2828        .fetch_one(&self.pool)
2829        .await
2830        .map_err(|e| MemoryError::DatabaseError {
2831            message: format!("Failed to create system metrics snapshot: {}", e),
2832        })?;
2833
2834        Ok(snapshot)
2835    }
2836    */
2837
2838    /* TEMPORARILY COMMENTED OUT - missing system_metrics_snapshots table
2839    /// Get recent system metrics snapshots
2840    pub async fn get_recent_system_metrics_snapshots(
2841        &self,
2842        snapshot_type: Option<SystemMetricsSnapshotType>,
2843        limit: i32,
2844    ) -> Result<Vec<SystemMetricsSnapshot>> {
2845        let snapshots = match snapshot_type {
2846            Some(st) => {
2847                sqlx::query_as!(
2848                    SystemMetricsSnapshot,
2849                    r#"
2850                    SELECT id, snapshot_type as "snapshot_type: SystemMetricsSnapshotType",
2851                           working_memory_count, warm_memory_count, cold_memory_count,
2852                           frozen_memory_count, total_storage_bytes, compressed_storage_bytes,
2853                           average_compression_ratio, average_query_time_ms, p95_query_time_ms,
2854                           p99_query_time_ms, slow_query_count, consolidation_backlog,
2855                           migration_queue_size, failed_operations_count, vector_index_size_mb,
2856                           vector_search_performance, database_cpu_percent, database_memory_mb,
2857                           connection_count, active_connections, recorded_at
2858                    FROM system_metrics_snapshots
2859                    WHERE snapshot_type = $1
2860                    ORDER BY recorded_at DESC
2861                    LIMIT $2
2862                    "#,
2863                    st as SystemMetricsSnapshotType,
2864                    limit
2865                )
2866                .fetch_all(&self.pool)
2867                .await
2868            }
2869            None => {
2870                sqlx::query_as!(
2871                    SystemMetricsSnapshot,
2872                    r#"
2873                    SELECT id, snapshot_type as "snapshot_type: SystemMetricsSnapshotType",
2874                           working_memory_count, warm_memory_count, cold_memory_count,
2875                           frozen_memory_count, total_storage_bytes, compressed_storage_bytes,
2876                           average_compression_ratio, average_query_time_ms, p95_query_time_ms,
2877                           p99_query_time_ms, slow_query_count, consolidation_backlog,
2878                           migration_queue_size, failed_operations_count, vector_index_size_mb,
2879                           vector_search_performance, database_cpu_percent, database_memory_mb,
2880                           connection_count, active_connections, recorded_at
2881                    FROM system_metrics_snapshots
2882                    ORDER BY recorded_at DESC
2883                    LIMIT $1
2884                    "#,
2885                    limit
2886                )
2887                .fetch_all(&self.pool)
2888                .await
2889            }
2890        }
2891        .map_err(|e| MemoryError::DatabaseError {
2892            message: format!("Failed to get system metrics snapshots: {}", e),
2893        })?;
2894
2895        Ok(snapshots)
2896    }
2897    */
2898
2899    // ==========================================
2900    // Forgetting Mechanisms Methods
2901    // ==========================================
2902
2903    /// Get memories for forgetting processing (clean architecture)
2904    pub async fn get_memories_for_forgetting(
2905        &self,
2906        tier: MemoryTier,
2907        batch_size: usize,
2908    ) -> Result<Vec<Memory>> {
2909        let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
2910
2911        // Add tier filter
2912        builder.add_tier_filter(&tier);
2913
2914        // Add condition for memories not updated recently (1 hour minimum)
2915        builder.add_condition("AND (updated_at IS NULL OR updated_at < NOW() - INTERVAL '1 hour')");
2916
2917        // Order by oldest first (NULLS FIRST for never-updated)
2918        builder.add_condition("ORDER BY updated_at ASC NULLS FIRST");
2919
2920        // Add pagination
2921        builder.add_pagination(batch_size, 0)?;
2922
2923        let query = builder.build_query();
2924        let query_with_params = builder.bind_parameters_as(sqlx::query_as::<_, Memory>(&query));
2925
2926        let memories = query_with_params.fetch_all(&self.pool).await?;
2927        Ok(memories)
2928    }
2929
2930    /// Batch update decay rates for forgetting mechanism
2931    pub async fn batch_update_decay_rates(
2932        &self,
2933        updates: &[(Uuid, f64)], // (memory_id, new_decay_rate)
2934    ) -> Result<usize> {
2935        if updates.is_empty() {
2936            return Ok(0);
2937        }
2938
2939        let mut tx = self.pool.begin().await?;
2940        let mut updated_count = 0;
2941
2942        for (memory_id, new_decay_rate) in updates {
2943            // Use direct SQL here as this is a simple update without complex query building
2944            // SafeQueryBuilder is primarily for search queries with dynamic conditions
2945            let result = sqlx::query(
2946                "UPDATE memories SET decay_rate = $1, updated_at = NOW() WHERE id = $2 AND status = 'active'"
2947            )
2948            .bind(new_decay_rate)
2949            .bind(memory_id)
2950            .execute(&mut *tx)
2951            .await?;
2952
2953            updated_count += result.rows_affected() as usize;
2954        }
2955
2956        tx.commit().await?;
2957        Ok(updated_count)
2958    }
2959
2960    /// Batch update importance scores for reinforcement learning
2961    pub async fn batch_update_importance_scores(
2962        &self,
2963        updates: &[(Uuid, f64)], // (memory_id, new_importance_score)
2964    ) -> Result<usize> {
2965        if updates.is_empty() {
2966            return Ok(0);
2967        }
2968
2969        let mut tx = self.pool.begin().await?;
2970        let mut updated_count = 0;
2971
2972        for (memory_id, new_importance_score) in updates {
2973            // Use direct SQL here as this is a simple update without complex query building
2974            // SafeQueryBuilder is primarily for search queries with dynamic conditions
2975            let result = sqlx::query(
2976                "UPDATE memories SET importance_score = $1, updated_at = NOW() WHERE id = $2 AND status = 'active'"
2977            )
2978            .bind(new_importance_score)
2979            .bind(memory_id)
2980            .execute(&mut *tx)
2981            .await?;
2982
2983            updated_count += result.rows_affected() as usize;
2984        }
2985
2986        tx.commit().await?;
2987        Ok(updated_count)
2988    }
2989
2990    /// Batch mark memories as deleted (soft delete for forgetting)
2991    pub async fn batch_soft_delete_memories(&self, memory_ids: &[Uuid]) -> Result<usize> {
2992        if memory_ids.is_empty() {
2993            return Ok(0);
2994        }
2995
2996        let mut tx = self.pool.begin().await?;
2997        let mut deleted_count = 0;
2998
2999        for memory_id in memory_ids {
3000            // Use direct SQL here as this is a simple update without complex query building
3001            // SafeQueryBuilder is primarily for search queries with dynamic conditions
3002            match sqlx::query(
3003                "UPDATE memories SET status = 'deleted', updated_at = NOW() WHERE id = $1 AND status = 'active'"
3004            )
3005            .bind(memory_id)
3006            .execute(&mut *tx)
3007            .await
3008            {
3009                Ok(result) => {
3010                    deleted_count += result.rows_affected() as usize;
3011                }
3012                Err(e) => {
3013                    warn!("Failed to soft delete memory {}: {}", memory_id, e);
3014                }
3015            }
3016        }
3017
3018        tx.commit().await?;
3019        Ok(deleted_count)
3020    }
3021}
3022
3023#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize, serde::Deserialize)]
3024pub struct MemoryStatistics {
3025    pub working_count: Option<i64>,
3026    pub warm_count: Option<i64>,
3027    pub cold_count: Option<i64>,
3028    pub total_active: Option<i64>,
3029    pub total_deleted: Option<i64>,
3030    pub avg_importance: Option<f64>,
3031    pub max_access_count: Option<i32>,
3032    pub avg_access_count: Option<f64>,
3033}
3034
3035#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize, serde::Deserialize)]
3036pub struct SimpleConsolidationStats {
3037    pub migration_candidates: Option<i64>,
3038    pub highly_consolidated: Option<i64>,
3039    pub avg_consolidation_strength: Option<f64>,
3040    pub avg_recall_probability: Option<f64>,
3041    pub recently_accessed: Option<i64>,
3042    pub total_active_memories: Option<i64>,
3043}
3044
3045#[cfg(test)]
3046mod tests {
3047    use super::*;
3048
3049    #[test]
3050    fn test_content_hash_generation() {
3051        let content = "This is a test memory content";
3052        let hash1 = Memory::calculate_content_hash(content);
3053        let hash2 = Memory::calculate_content_hash(content);
3054
3055        assert_eq!(hash1, hash2);
3056        assert_eq!(hash1.len(), 64); // SHA-256 produces 64 hex characters
3057    }
3058
3059    #[test]
3060    fn test_should_migrate() {
3061        let mut memory = Memory::default();
3062
3063        // Working tier with very low importance and old memory should migrate
3064        memory.tier = MemoryTier::Working;
3065        memory.importance_score = 0.01;
3066        memory.consolidation_strength = 0.1;
3067        memory.access_count = 0;
3068        memory.last_accessed_at = Some(Utc::now() - chrono::Duration::days(30)); // Very old
3069        assert!(memory.should_migrate());
3070
3071        // Working tier with high importance should not migrate
3072        memory.importance_score = 0.9;
3073        memory.consolidation_strength = 8.0;
3074        memory.access_count = 100;
3075        memory.last_accessed_at = Some(Utc::now()); // Just accessed
3076        assert!(!memory.should_migrate());
3077
3078        // Cold tier with very low importance may migrate to frozen
3079        // based on the new math engine thresholds (0.3 for frozen migration)
3080        memory.tier = MemoryTier::Cold;
3081        memory.importance_score = 0.0;
3082        memory.last_accessed_at = Some(Utc::now() - chrono::Duration::days(30)); // Old memory
3083                                                                                 // This may or may not migrate depending on calculated recall probability
3084                                                                                 // So we test both scenarios
3085
3086        // Test Frozen tier - should never migrate
3087        memory.tier = MemoryTier::Frozen;
3088        assert!(!memory.should_migrate());
3089    }
3090
3091    #[test]
3092    fn test_next_tier() {
3093        let mut memory = Memory::default();
3094
3095        memory.tier = MemoryTier::Working;
3096        assert_eq!(memory.next_tier(), Some(MemoryTier::Warm));
3097
3098        memory.tier = MemoryTier::Warm;
3099        assert_eq!(memory.next_tier(), Some(MemoryTier::Cold));
3100
3101        memory.tier = MemoryTier::Cold;
3102        assert_eq!(memory.next_tier(), Some(MemoryTier::Frozen));
3103
3104        memory.tier = MemoryTier::Frozen;
3105        assert_eq!(memory.next_tier(), None);
3106    }
3107
3108    #[test]
3109    fn test_safe_query_builder_pagination_validation() {
3110        let mut builder = SafeQueryBuilder::new("SELECT * FROM memories");
3111
3112        // Test valid pagination
3113        assert!(builder.add_pagination(100, 50).is_ok());
3114
3115        // Test excessive limit
3116        let mut builder2 = SafeQueryBuilder::new("SELECT * FROM memories");
3117        assert!(builder2.add_pagination(20000, 0).is_err());
3118
3119        // Test excessive offset
3120        let mut builder3 = SafeQueryBuilder::new("SELECT * FROM memories");
3121        assert!(builder3.add_pagination(10, 2000000).is_err());
3122    }
3123
3124    #[test]
3125    fn test_safe_query_builder_parameterization() {
3126        let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
3127
3128        // Add tier filter
3129        let tier = MemoryTier::Working;
3130        builder.add_tier_filter(&tier);
3131
3132        // Add importance range
3133        builder.add_importance_range(Some(0.5), Some(0.9));
3134
3135        // Add pagination
3136        builder.add_pagination(10, 0).unwrap();
3137
3138        let query = builder.build_query();
3139
3140        // Verify parameterized placeholders are used
3141        assert!(query.contains("$1"));
3142        assert!(query.contains("$2"));
3143        assert!(query.contains("$3"));
3144        assert!(query.contains("$4"));
3145        assert!(query.contains("$5"));
3146
3147        // Verify no raw values are interpolated
3148        assert!(!query.contains("0.5"));
3149        assert!(!query.contains("0.9"));
3150        assert!(!query.contains("Working"));
3151    }
3152
3153    #[test]
3154    fn test_safe_query_builder_sql_injection_prevention() {
3155        let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
3156
3157        // Try to inject malicious tier (would normally be validated by the type system)
3158        let tier = MemoryTier::Working;
3159        builder.add_tier_filter(&tier);
3160
3161        // Try to inject through importance range
3162        builder.add_importance_range(Some(0.1), Some(1.0));
3163
3164        let query = builder.build_query();
3165
3166        // Verify that we have parameterized placeholders, not raw values
3167        assert!(query.contains("tier = $"));
3168        assert!(query.contains("importance_score >= $"));
3169        assert!(query.contains("importance_score <= $"));
3170
3171        // Verify no SQL injection patterns
3172        assert!(!query.contains("'; DROP TABLE"));
3173        assert!(!query.contains("OR 1=1"));
3174        assert!(!query.contains("UNION SELECT"));
3175    }
3176
3177    #[test]
3178    fn test_consolidation_strength_range_parameterization() {
3179        let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
3180
3181        builder.add_consolidation_strength_range(Some(1.0), Some(5.0));
3182        builder.add_recall_probability_range(Some(0.1), Some(0.9));
3183
3184        let query = builder.build_query();
3185
3186        // Verify parameterization
3187        assert!(query.contains("consolidation_strength >= $"));
3188        assert!(query.contains("consolidation_strength <= $"));
3189        assert!(query.contains("recall_probability >= $"));
3190        assert!(query.contains("recall_probability <= $"));
3191
3192        // Verify no raw values
3193        assert!(!query.contains("1.0"));
3194        assert!(!query.contains("5.0"));
3195        assert!(!query.contains("0.1"));
3196        assert!(!query.contains("0.9"));
3197    }
3198
3199    #[test]
3200    fn test_date_range_parameterization() {
3201        let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
3202
3203        let start_date = Utc::now() - chrono::Duration::days(30);
3204        let end_date = Utc::now();
3205
3206        builder.add_date_range(Some(&start_date), Some(&end_date));
3207
3208        let query = builder.build_query();
3209
3210        // Verify parameterization
3211        assert!(query.contains("created_at >= $"));
3212        assert!(query.contains("created_at <= $"));
3213
3214        // Verify no raw date strings (which would be vulnerable)
3215        assert!(!query.contains(&start_date.format("%Y-%m-%d").to_string()));
3216        assert!(!query.contains(&end_date.format("%Y-%m-%d").to_string()));
3217    }
3218
3219    #[test]
3220    fn test_similarity_threshold_parameterization() {
3221        let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
3222
3223        // This would be called after the first bind parameter (embedding)
3224        builder.bind_index = 2; // Simulate that $1 is already used for embedding
3225        builder.add_similarity_threshold(0.7);
3226
3227        let query = builder.build_query();
3228
3229        // Verify parameterization with correct bind index
3230        assert!(query.contains("embedding <=> $1"));
3231        assert!(query.contains(">= $2"));
3232
3233        // Verify no raw threshold value
3234        assert!(!query.contains("0.7"));
3235    }
3236
3237    #[test]
3238    fn test_query_builder_prevents_format_injection() {
3239        let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
3240
3241        // Test that all potentially dangerous inputs are parameterized
3242        let malicious_tier = MemoryTier::Working; // Type safety prevents actual injection here
3243        builder.add_tier_filter(&malicious_tier);
3244
3245        // Test importance ranges that could contain injection attempts
3246        builder.add_importance_range(Some(0.0), Some(1.0));
3247
3248        // Test exclude frozen functionality
3249        builder.add_exclude_frozen(true);
3250
3251        let query = builder.build_query();
3252
3253        // Verify all user inputs are parameterized
3254        assert!(query.matches('$').count() >= 3); // At least $1, $2, $3
3255
3256        // Verify static parts are hardcoded safely
3257        assert!(query.contains("tier != 'frozen'"));
3258        assert!(query.contains("status = 'active'"));
3259
3260        // Verify no format! patterns remain
3261        assert!(!query.contains("{}"));
3262        assert!(!query.contains("{:"));
3263    }
3264
3265    #[test]
3266    fn test_complex_query_building_safety() {
3267        let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
3268
3269        // Build a complex query with multiple filters
3270        builder.add_tier_filter(&MemoryTier::Working);
3271        builder.add_importance_range(Some(0.3), Some(0.8));
3272
3273        let start_date = Utc::now() - chrono::Duration::days(7);
3274        let end_date = Utc::now();
3275        builder.add_date_range(Some(&start_date), Some(&end_date));
3276
3277        builder.add_consolidation_strength_range(Some(2.0), None);
3278        builder.add_recall_probability_range(None, Some(0.9));
3279        builder.add_exclude_frozen(true);
3280
3281        builder.add_condition("ORDER BY importance_score DESC");
3282        builder.add_pagination(50, 100).unwrap();
3283
3284        let query = builder.build_query();
3285
3286        // Verify the query is well-formed and safe
3287        assert!(query.starts_with("SELECT * FROM memories WHERE status = 'active'"));
3288        assert!(query.contains("ORDER BY importance_score DESC"));
3289        assert!(query.contains("LIMIT"));
3290        assert!(query.contains("OFFSET"));
3291
3292        // Verify all user inputs are parameterized
3293        let param_count = query.matches('$').count();
3294        assert!(param_count >= 7); // tier, importance_min, importance_max, date_start, date_end, consolidation_min, recall_max, limit, offset
3295
3296        // Verify no SQL injection patterns
3297        assert!(!query.contains("'; "));
3298        assert!(!query.contains("OR 1=1"));
3299        assert!(!query.contains("UNION"));
3300        assert!(!query.contains("--"));
3301        assert!(!query.contains("/*"));
3302    }
3303
3304    #[tokio::test]
3305    async fn test_get_memories_for_forgetting_query_structure() {
3306        // Test the query building logic for the new forgetting method
3307        let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
3308
3309        // Add tier filter
3310        let tier = MemoryTier::Working;
3311        builder.add_tier_filter(&tier);
3312
3313        // Add condition for memories not updated recently (1 hour minimum)
3314        builder.add_condition("AND (updated_at IS NULL OR updated_at < NOW() - INTERVAL '1 hour')");
3315
3316        // Order by oldest first (NULLS FIRST for never-updated)
3317        builder.add_condition("ORDER BY updated_at ASC NULLS FIRST");
3318
3319        let query = builder.build_query();
3320
3321        // Verify query structure
3322        assert!(query.contains("SELECT * FROM memories WHERE status = 'active'"));
3323        assert!(query.contains("tier = $"));
3324        assert!(query.contains("updated_at IS NULL"));
3325        assert!(query.contains("updated_at < NOW() - INTERVAL '1 hour'"));
3326        assert!(query.contains("ORDER BY updated_at ASC NULLS FIRST"));
3327
3328        // Verify no SQL injection
3329        assert!(!query.contains("'; DROP"));
3330        assert!(!query.contains("OR 1=1"));
3331    }
3332
3333    #[test]
3334    fn test_clean_architecture_layer_separation() {
3335        // Test that repository methods follow clean architecture principles
3336        // This test validates that:
3337        // 1. Repository methods use SafeQueryBuilder for SQL safety
3338        // 2. Service layer doesn't need to know SQL details
3339        // 3. Domain logic is separated from data access
3340
3341        let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
3342
3343        // Test that SafeQueryBuilder abstracts SQL complexity
3344        builder.add_tier_filter(&MemoryTier::Working);
3345        builder.add_condition("AND (updated_at IS NULL OR updated_at < NOW() - INTERVAL '1 hour')");
3346
3347        let query = builder.build_query();
3348
3349        // Verify abstraction works properly
3350        assert!(query.len() > 50); // Complex query was built
3351        assert!(query.contains("$")); // Parameterized
3352        assert!(!query.contains("'working'")); // No direct value injection
3353
3354        // Verify the builder maintains SQL safety while providing flexibility
3355        assert!(!query.contains("'; ")); // No SQL injection patterns
3356    }
3357
3358    #[test]
3359    fn test_repository_method_signatures() {
3360        // Test that new methods have correct signatures for clean architecture
3361        // This ensures service layer can call repository methods without SQL knowledge
3362
3363        // Mock testing - verify method signatures exist and are properly typed
3364        // In a real integration test, this would use a test database
3365
3366        // Verify get_memories_for_forgetting signature
3367        let _test_fn: fn(
3368            &MemoryRepository,
3369            MemoryTier,
3370            usize,
3371        ) -> std::pin::Pin<
3372            Box<dyn std::future::Future<Output = Result<Vec<Memory>>> + Send + '_>,
3373        > = |repo, tier, batch_size| Box::pin(repo.get_memories_for_forgetting(tier, batch_size));
3374
3375        // The fact this compiles confirms the method signature is correct
3376        assert!(true);
3377    }
3378}