1use super::error::{MemoryError, Result};
2use super::event_triggers::EventTriggeredScoringEngine;
3use super::math_engine::constants;
4use super::models::*;
5use crate::config::Config;
6use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
7use base64::Engine;
8use chrono::{DateTime, Utc};
9use pgvector::Vector;
10use sqlx::postgres::types::PgInterval;
11use sqlx::{PgPool, Postgres, Row, Transaction};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Instant;
15use tracing::{debug, info, warn};
16use uuid::Uuid;
17
18pub struct MemoryRepository {
19 pool: PgPool,
20 trigger_engine: Option<Arc<EventTriggeredScoringEngine>>,
21 config: Option<Config>,
22}
23
24#[derive(Debug, Clone)]
26pub struct SafeQueryBuilder {
27 query_parts: Vec<String>,
28 parameters: Vec<QueryParameter>,
29 bind_index: usize,
30}
31
32#[derive(Debug, Clone)]
33enum QueryParameter {
34 Text(String),
35 Integer(i64),
36 Float(f64),
37 DateTime(DateTime<Utc>),
38 Tier(MemoryTier),
39 Uuid(Uuid),
40 Vector(Vector),
41}
42
43impl SafeQueryBuilder {
44 pub fn new(base_query: &str) -> Self {
45 Self {
46 query_parts: vec![base_query.to_string()],
47 parameters: Vec::new(),
48 bind_index: 1,
49 }
50 }
51
52 pub fn add_condition(&mut self, condition: &str) -> &mut Self {
54 self.query_parts.push(condition.to_string());
55 self
56 }
57
58 pub fn add_tier_filter(&mut self, tier: &MemoryTier) -> &mut Self {
60 let condition = format!("AND m.tier = ${}", self.bind_index);
61 self.query_parts.push(condition);
62 self.parameters.push(QueryParameter::Tier(*tier));
63 self.bind_index += 1;
64 self
65 }
66
67 pub fn add_date_range(
69 &mut self,
70 start: Option<&DateTime<Utc>>,
71 end: Option<&DateTime<Utc>>,
72 ) -> &mut Self {
73 if let Some(start_date) = start {
74 let condition = format!("AND m.created_at >= ${}", self.bind_index);
75 self.query_parts.push(condition);
76 self.parameters.push(QueryParameter::DateTime(*start_date));
77 self.bind_index += 1;
78 }
79 if let Some(end_date) = end {
80 let condition = format!("AND m.created_at <= ${}", self.bind_index);
81 self.query_parts.push(condition);
82 self.parameters.push(QueryParameter::DateTime(*end_date));
83 self.bind_index += 1;
84 }
85 self
86 }
87
88 pub fn add_importance_range(&mut self, min: Option<f64>, max: Option<f64>) -> &mut Self {
90 if let Some(min_score) = min {
91 let condition = format!("AND m.importance_score >= ${}", self.bind_index);
92 self.query_parts.push(condition);
93 self.parameters.push(QueryParameter::Float(min_score));
94 self.bind_index += 1;
95 }
96 if let Some(max_score) = max {
97 let condition = format!("AND m.importance_score <= ${}", self.bind_index);
98 self.query_parts.push(condition);
99 self.parameters.push(QueryParameter::Float(max_score));
100 self.bind_index += 1;
101 }
102 self
103 }
104
105 pub fn add_similarity_threshold(&mut self, threshold: f64) -> &mut Self {
107 let condition = format!("AND (1 - (m.embedding <=> $1)) >= ${}", self.bind_index);
108 self.query_parts.push(condition);
109 self.parameters.push(QueryParameter::Float(threshold));
110 self.bind_index += 1;
111 self
112 }
113
114 pub fn add_consolidation_strength_range(
116 &mut self,
117 min: Option<f64>,
118 max: Option<f64>,
119 ) -> &mut Self {
120 if let Some(min_strength) = min {
121 let condition = format!("AND consolidation_strength >= ${}", self.bind_index);
122 self.query_parts.push(condition);
123 self.parameters.push(QueryParameter::Float(min_strength));
124 self.bind_index += 1;
125 }
126 if let Some(max_strength) = max {
127 let condition = format!("AND consolidation_strength <= ${}", self.bind_index);
128 self.query_parts.push(condition);
129 self.parameters.push(QueryParameter::Float(max_strength));
130 self.bind_index += 1;
131 }
132 self
133 }
134
135 pub fn add_recall_probability_range(
137 &mut self,
138 min: Option<f64>,
139 max: Option<f64>,
140 ) -> &mut Self {
141 if let Some(min_recall) = min {
142 let condition = format!("AND recall_probability >= ${}", self.bind_index);
143 self.query_parts.push(condition);
144 self.parameters.push(QueryParameter::Float(min_recall));
145 self.bind_index += 1;
146 }
147 if let Some(max_recall) = max {
148 let condition = format!("AND recall_probability <= ${}", self.bind_index);
149 self.query_parts.push(condition);
150 self.parameters.push(QueryParameter::Float(max_recall));
151 self.bind_index += 1;
152 }
153 self
154 }
155
156 pub fn add_exclude_frozen(&mut self, exclude: bool) -> &mut Self {
158 if exclude {
159 self.query_parts.push("AND tier != 'frozen'".to_string());
160 }
161 self
162 }
163
164 pub fn add_last_access_interval(&mut self, hours: f64) -> &mut Self {
166 let condition = format!(
167 "AND (last_accessed_at IS NULL OR last_accessed_at < NOW() - INTERVAL '${} hours')",
168 self.bind_index
169 );
170 self.query_parts.push(condition);
171 self.parameters
172 .push(QueryParameter::Text(hours.to_string()));
173 self.bind_index += 1;
174 self
175 }
176
177 pub fn add_recall_threshold_condition(&mut self, threshold: f64) -> &mut Self {
179 self.parameters.push(QueryParameter::Float(threshold));
181 self.bind_index += 1;
182 self
183 }
184
185 pub fn add_pagination(&mut self, limit: usize, offset: usize) -> Result<&mut Self> {
187 if limit > 10000 {
189 return Err(MemoryError::InvalidRequest {
190 message: "Limit cannot exceed 10000 for performance reasons".to_string(),
191 });
192 }
193 if offset > 1000000 {
194 return Err(MemoryError::InvalidRequest {
195 message: "Offset cannot exceed 1000000 for performance reasons".to_string(),
196 });
197 }
198
199 let condition = format!("LIMIT ${} OFFSET ${}", self.bind_index, self.bind_index + 1);
200 self.query_parts.push(condition);
201 self.parameters.push(QueryParameter::Integer(limit as i64));
202 self.parameters.push(QueryParameter::Integer(offset as i64));
203 self.bind_index += 2;
204 Ok(self)
205 }
206
207 pub fn build_query(&self) -> String {
209 self.query_parts.join(" ")
210 }
211
212 pub fn bind_parameters<'a>(
214 &'a self,
215 mut query: sqlx::query::Query<'a, Postgres, sqlx::postgres::PgArguments>,
216 ) -> sqlx::query::Query<'a, Postgres, sqlx::postgres::PgArguments> {
217 for param in &self.parameters {
218 query = match param {
219 QueryParameter::Text(s) => query.bind(s),
220 QueryParameter::Integer(i) => query.bind(*i),
221 QueryParameter::Float(f) => query.bind(*f),
222 QueryParameter::DateTime(dt) => query.bind(*dt),
223 QueryParameter::Tier(tier) => query.bind(tier),
224 QueryParameter::Uuid(uuid) => query.bind(*uuid),
225 QueryParameter::Vector(vec) => query.bind(vec),
226 };
227 }
228 query
229 }
230
231 pub fn bind_parameters_as<'a, T>(
233 &'a self,
234 mut query: sqlx::query::QueryAs<'a, Postgres, T, sqlx::postgres::PgArguments>,
235 ) -> sqlx::query::QueryAs<'a, Postgres, T, sqlx::postgres::PgArguments>
236 where
237 T: for<'r> sqlx::FromRow<'r, sqlx::postgres::PgRow>,
238 {
239 for param in &self.parameters {
240 query = match param {
241 QueryParameter::Text(s) => query.bind(s),
242 QueryParameter::Integer(i) => query.bind(*i),
243 QueryParameter::Float(f) => query.bind(*f),
244 QueryParameter::DateTime(dt) => query.bind(*dt),
245 QueryParameter::Tier(tier) => query.bind(tier),
246 QueryParameter::Uuid(uuid) => query.bind(*uuid),
247 QueryParameter::Vector(vec) => query.bind(vec),
248 };
249 }
250 query
251 }
252}
253
254impl MemoryRepository {
255 pub fn new(pool: PgPool) -> Self {
256 Self {
257 pool,
258 trigger_engine: None,
259 config: None,
260 }
261 }
262
263 pub fn with_config(pool: PgPool, config: Config) -> Self {
264 Self {
265 pool,
266 trigger_engine: None,
267 config: Some(config),
268 }
269 }
270
271 pub fn with_trigger_engine(
272 pool: PgPool,
273 trigger_engine: Arc<EventTriggeredScoringEngine>,
274 ) -> Self {
275 Self {
276 pool,
277 trigger_engine: Some(trigger_engine),
278 config: None,
279 }
280 }
281
282 pub fn with_config_and_trigger_engine(
283 pool: PgPool,
284 config: Config,
285 trigger_engine: Arc<EventTriggeredScoringEngine>,
286 ) -> Self {
287 Self {
288 pool,
289 trigger_engine: Some(trigger_engine),
290 config: Some(config),
291 }
292 }
293
294 pub fn pool(&self) -> &PgPool {
295 &self.pool
296 }
297
298 pub async fn create_memory(&self, request: CreateMemoryRequest) -> Result<Memory> {
299 self.create_memory_with_user_context(request, None).await
300 }
301
302 pub async fn create_memory_with_user_context(
303 &self,
304 request: CreateMemoryRequest,
305 user_id: Option<&str>,
306 ) -> Result<Memory> {
307 let id = Uuid::new_v4();
308 let content_hash = Memory::calculate_content_hash(&request.content);
309 let tier = request.tier.unwrap_or(MemoryTier::Working);
310
311 let skip_duplicate_check =
313 std::env::var("SKIP_DUPLICATE_CHECK").unwrap_or_else(|_| "false".to_string()) == "true";
314
315 if !skip_duplicate_check {
316 let duplicate_exists = sqlx::query_scalar::<_, bool>(
317 "SELECT EXISTS(SELECT 1 FROM memories WHERE content_hash = $1 AND tier = $2 AND status = 'active')"
318 )
319 .bind(&content_hash)
320 .bind(tier)
321 .fetch_one(&self.pool)
322 .await?;
323
324 if duplicate_exists {
325 return Err(MemoryError::DuplicateContent {
326 tier: format!("{tier:?}"),
327 });
328 }
329 }
330
331 if tier == MemoryTier::Working {
333 if let Some(ref config) = self.config {
334 let working_count: i64 = sqlx::query_scalar(
335 "SELECT COUNT(*) FROM memories WHERE tier = 'working' AND status = 'active'",
336 )
337 .fetch_one(&self.pool)
338 .await?;
339
340 if working_count >= config.tier_config.working_tier_limit as i64 {
341 info!(
343 "Working memory at capacity ({}/{}), applying LRU eviction",
344 working_count, config.tier_config.working_tier_limit
345 );
346
347 let lru_memory_id: Option<Uuid> = sqlx::query_scalar(
349 "SELECT id FROM memories
350 WHERE tier = 'working' AND status = 'active'
351 ORDER BY last_accessed ASC
352 LIMIT 1",
353 )
354 .fetch_optional(&self.pool)
355 .await?;
356
357 if let Some(memory_id) = lru_memory_id {
358 sqlx::query(
360 "UPDATE memories SET tier = 'warm', updated_at = NOW()
361 WHERE id = $1",
362 )
363 .bind(memory_id)
364 .execute(&self.pool)
365 .await?;
366
367 info!(
368 "Evicted LRU memory {} from working to warm tier due to capacity limit",
369 memory_id
370 );
371
372 } else {
375 return Err(MemoryError::StorageExhausted {
377 tier: "working".to_string(),
378 limit: config.tier_config.working_tier_limit,
379 });
380 }
381 }
382 }
383 }
384
385 let (final_importance_score, trigger_result) = if let Some(trigger_engine) =
387 &self.trigger_engine
388 {
389 let original_importance = request.importance_score.unwrap_or(0.5);
390
391 match trigger_engine
392 .analyze_content(&request.content, original_importance, user_id)
393 .await
394 {
395 Ok(result) => {
396 if result.triggered {
397 info!(
398 "Memory triggered event: {:?} (confidence: {:.2}, boosted: {:.2} -> {:.2})",
399 result.trigger_type, result.confidence, result.original_importance, result.boosted_importance
400 );
401 (result.boosted_importance, Some(result))
402 } else {
403 (original_importance, Some(result))
404 }
405 }
406 Err(e) => {
407 warn!("Failed to analyze content for triggers: {}", e);
408 (request.importance_score.unwrap_or(0.5), None)
409 }
410 }
411 } else {
412 (request.importance_score.unwrap_or(0.5), None)
413 };
414
415 let embedding = request.embedding.map(Vector::from);
416
417 let mut metadata = request.metadata.unwrap_or(serde_json::json!({}));
419 if let Some(trigger_result) = &trigger_result {
420 if trigger_result.triggered {
421 metadata["trigger_info"] = serde_json::json!({
422 "triggered": true,
423 "trigger_type": trigger_result.trigger_type,
424 "confidence": trigger_result.confidence,
425 "original_importance": trigger_result.original_importance,
426 "boosted_importance": trigger_result.boosted_importance,
427 "processing_time_ms": trigger_result.processing_time.as_millis()
428 });
429 }
430 }
431
432 let memory = sqlx::query_as::<_, Memory>(
433 r#"
434 INSERT INTO memories (
435 id, content, content_hash, embedding, tier, status,
436 importance_score, metadata, parent_id, expires_at,
437 consolidation_strength, decay_rate
438 )
439 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
440 RETURNING *
441 "#,
442 )
443 .bind(id)
444 .bind(&request.content)
445 .bind(&content_hash)
446 .bind(embedding)
447 .bind(tier)
448 .bind(MemoryStatus::Active)
449 .bind(final_importance_score)
450 .bind(metadata)
451 .bind(request.parent_id)
452 .bind(request.expires_at)
453 .bind(1.0_f64) .bind(1.0_f64) .fetch_one(&self.pool)
456 .await?;
457
458 info!(
459 "Created memory {} in tier {:?} with importance {:.2}",
460 memory.id, memory.tier, final_importance_score
461 );
462 Ok(memory)
463 }
464
465 pub async fn get_memory(&self, id: Uuid) -> Result<Memory> {
466 let memory = sqlx::query_as::<_, Memory>(
467 r#"
468 UPDATE memories
469 SET access_count = access_count + 1,
470 last_accessed_at = NOW()
471 WHERE id = $1 AND status = 'active'
472 RETURNING *
473 "#,
474 )
475 .bind(id)
476 .fetch_optional(&self.pool)
477 .await?
478 .ok_or_else(|| MemoryError::NotFound { id: id.to_string() })?;
479
480 debug!("Retrieved memory {} from tier {:?}", id, memory.tier);
481 Ok(memory)
482 }
483
484 pub async fn update_memory(&self, id: Uuid, request: UpdateMemoryRequest) -> Result<Memory> {
485 let mut tx = self.pool.begin().await?;
486
487 let current = sqlx::query_as::<_, Memory>(
489 "SELECT * FROM memories WHERE id = $1 AND status = 'active' FOR UPDATE",
490 )
491 .bind(id)
492 .fetch_optional(&mut *tx)
493 .await?
494 .ok_or_else(|| MemoryError::NotFound { id: id.to_string() })?;
495
496 let content = request.content.as_ref().unwrap_or(¤t.content);
498 let content_hash = if request.content.is_some() {
499 Memory::calculate_content_hash(content)
500 } else {
501 current.content_hash.clone()
502 };
503
504 let embedding = request.embedding.map(Vector::from).or(current.embedding);
505 let tier = request.tier.unwrap_or(current.tier);
506 let importance_score = request.importance_score.unwrap_or(current.importance_score);
507 let metadata = request.metadata.as_ref().unwrap_or(¤t.metadata);
508 let expires_at = request.expires_at.or(current.expires_at);
509
510 let updated = sqlx::query_as::<_, Memory>(
511 r#"
512 UPDATE memories
513 SET content = $2, content_hash = $3, embedding = $4, tier = $5,
514 importance_score = $6, metadata = $7, expires_at = $8,
515 updated_at = NOW()
516 WHERE id = $1
517 RETURNING *
518 "#,
519 )
520 .bind(id)
521 .bind(content)
522 .bind(&content_hash)
523 .bind(embedding)
524 .bind(tier)
525 .bind(importance_score)
526 .bind(metadata)
527 .bind(expires_at)
528 .fetch_one(&mut *tx)
529 .await?;
530
531 if current.tier != tier {
533 self.record_migration(
534 &mut tx,
535 id,
536 current.tier,
537 tier,
538 Some("Manual update".to_string()),
539 )
540 .await?;
541 }
542
543 tx.commit().await?;
544 info!("Updated memory {}", id);
545 Ok(updated)
546 }
547
548 pub async fn delete_memory(&self, id: Uuid) -> Result<()> {
549 let result = sqlx::query(
550 "UPDATE memories SET status = 'deleted' WHERE id = $1 AND status = 'active'",
551 )
552 .bind(id)
553 .execute(&self.pool)
554 .await?;
555
556 if result.rows_affected() == 0 {
557 return Err(MemoryError::NotFound { id: id.to_string() });
558 }
559
560 info!("Soft deleted memory {}", id);
561 Ok(())
562 }
563
564 pub async fn search_memories_enhanced(
566 &self,
567 request: crate::memory::enhanced_retrieval::MemoryAwareSearchRequest,
568 ) -> Result<crate::memory::enhanced_retrieval::MemoryAwareSearchResponse> {
569 use crate::memory::enhanced_retrieval::*;
570
571 let config = EnhancedRetrievalConfig::default();
572 let retrieval_engine = MemoryAwareRetrievalEngine::new(
573 config,
574 std::sync::Arc::new(MemoryRepository::new(self.pool.clone())),
575 None,
576 );
577
578 retrieval_engine.search(request).await
579 }
580
581 pub async fn search_memories(&self, request: SearchRequest) -> Result<SearchResponse> {
582 let start_time = Instant::now();
583
584 let search_type = request
585 .search_type
586 .as_ref()
587 .unwrap_or(&SearchType::Semantic)
588 .clone();
589 let limit = request.limit.unwrap_or(10);
590 let offset = request.offset.unwrap_or(0);
591
592 let results = match search_type {
593 SearchType::Semantic => self.semantic_search(&request).await?,
594 SearchType::Temporal => self.temporal_search(&request).await?,
595 SearchType::Hybrid => self.hybrid_search(&request).await?,
596 SearchType::FullText => self.fulltext_search(&request).await?,
597 };
598
599 let total_count = if request.include_facets.unwrap_or(false) {
600 Some(self.count_search_results(&request).await?)
601 } else {
602 None
603 };
604
605 let facets = if request.include_facets.unwrap_or(false) {
606 Some(self.generate_search_facets(&request).await?)
607 } else {
608 None
609 };
610
611 let suggestions = if request.query_text.is_some() {
612 Some(self.generate_query_suggestions(&request).await?)
613 } else {
614 None
615 };
616
617 let next_cursor = if results.len() as i32 >= limit {
618 Some(self.generate_cursor(offset + limit as i64, &request))
619 } else {
620 None
621 };
622
623 let execution_time_ms = start_time.elapsed().as_millis() as u64;
624
625 Ok(SearchResponse {
626 results,
627 total_count,
628 facets,
629 suggestions,
630 next_cursor,
631 execution_time_ms,
632 })
633 }
634
635 async fn semantic_search(&self, request: &SearchRequest) -> Result<Vec<SearchResult>> {
636 let query_embedding = if let Some(ref embedding) = request.query_embedding {
637 Vector::from(embedding.clone())
638 } else {
639 return Err(MemoryError::InvalidRequest {
640 message: "Query embedding is required for semantic search".to_string(),
641 });
642 };
643
644 let limit = request.limit.unwrap_or(10);
645 let offset = request.offset.unwrap_or(0);
646 let threshold = request.similarity_threshold.unwrap_or(0.7);
647
648 let mut builder = SafeQueryBuilder::new(
650 "SELECT m.*, 1 - (m.embedding <=> $1) as similarity_score FROM memories m WHERE m.status = 'active' AND m.embedding IS NOT NULL"
651 );
652 builder.bind_index = 2;
654
655 self.add_filters_safe(request, &mut builder)?;
657
658 builder.add_similarity_threshold(threshold as f64);
660
661 builder.add_condition("ORDER BY similarity_score DESC");
663 builder.add_pagination(limit as usize, offset as usize)?;
664
665 let query = builder.build_query();
667 let mut sqlx_query = sqlx::query(&query).bind(&query_embedding);
668 sqlx_query = builder.bind_parameters(sqlx_query);
669
670 let rows = sqlx_query.fetch_all(&self.pool).await?;
671
672 self.build_search_results(rows, request).await
673 }
674
675 async fn temporal_search(&self, request: &SearchRequest) -> Result<Vec<SearchResult>> {
676 let limit = request.limit.unwrap_or(10);
677 let offset = request.offset.unwrap_or(0);
678
679 let mut builder = SafeQueryBuilder::new(
681 "SELECT m.*, 0.0 as similarity_score FROM memories m WHERE m.status = 'active'",
682 );
683
684 self.add_filters_safe(request, &mut builder)?;
686
687 builder.add_condition("ORDER BY m.created_at DESC, m.updated_at DESC");
689 builder.add_pagination(limit as usize, offset as usize)?;
690
691 let query = builder.build_query();
693 let sqlx_query = builder.bind_parameters(sqlx::query(&query));
694
695 let rows = sqlx_query.fetch_all(&self.pool).await?;
696
697 self.build_search_results(rows, request).await
698 }
699
700 async fn hybrid_search(&self, request: &SearchRequest) -> Result<Vec<SearchResult>> {
701 let _weights = request.hybrid_weights.as_ref().unwrap_or(&HybridWeights {
703 semantic_weight: 0.333,
704 temporal_weight: 0.333, importance_weight: 0.334,
706 access_frequency_weight: 0.0, });
708
709 let query_embedding = if let Some(ref embedding) = request.query_embedding {
710 Vector::from(embedding.clone())
711 } else {
712 return Err(MemoryError::InvalidRequest {
713 message: "Query embedding is required for hybrid search".to_string(),
714 });
715 };
716
717 let limit = request.limit.unwrap_or(10);
718 let offset = request.offset.unwrap_or(0);
719 let threshold = request.similarity_threshold.unwrap_or(0.5);
720
721 sqlx::query(
723 r#"
724 UPDATE memories
725 SET recency_score = calculate_recency_score(last_accessed_at, created_at, 0.005),
726 relevance_score = LEAST(1.0,
727 0.5 * importance_score +
728 0.3 * LEAST(1.0, access_count / 100.0) +
729 0.2
730 )
731 WHERE status = 'active' AND embedding IS NOT NULL
732 "#,
733 )
734 .execute(&self.pool)
735 .await?;
736
737 let query = format!(
739 r#"
740 SELECT m.*,
741 1 - (m.embedding <=> $1) as similarity_score,
742 m.recency_score as temporal_score,
743 m.importance_score,
744 m.relevance_score,
745 COALESCE(m.access_count, 0) as access_count,
746 m.combined_score as combined_score
747 FROM memories m
748 WHERE m.status = 'active'
749 AND m.embedding IS NOT NULL
750 AND 1 - (m.embedding <=> $1) >= {threshold}
751 ORDER BY m.combined_score DESC, similarity_score DESC
752 LIMIT {limit} OFFSET {offset}
753 "#
754 );
755
756 let rows = sqlx::query(&query)
757 .bind(&query_embedding)
758 .fetch_all(&self.pool)
759 .await?;
760
761 self.build_search_results(rows, request).await
762 }
763
764 async fn fulltext_search(&self, request: &SearchRequest) -> Result<Vec<SearchResult>> {
765 let query_text =
766 request
767 .query_text
768 .as_ref()
769 .ok_or_else(|| MemoryError::InvalidRequest {
770 message: "Query text is required for full-text search".to_string(),
771 })?;
772
773 let limit = request.limit.unwrap_or(10);
774 let offset = request.offset.unwrap_or(0);
775
776 let query = format!(
777 r#"
778 SELECT m.*,
779 ts_rank_cd(to_tsvector('english', m.content), plainto_tsquery('english', $1)) as similarity_score
780 FROM memories m
781 WHERE m.status = 'active'
782 AND to_tsvector('english', m.content) @@ plainto_tsquery('english', $1)
783 ORDER BY similarity_score DESC
784 LIMIT {limit} OFFSET {offset}
785 "#
786 );
787
788 let rows = sqlx::query(&query)
789 .bind(query_text)
790 .fetch_all(&self.pool)
791 .await?;
792
793 self.build_search_results(rows, request).await
794 }
795
796 fn add_filters_safe(
798 &self,
799 request: &SearchRequest,
800 builder: &mut SafeQueryBuilder,
801 ) -> Result<()> {
802 if let Some(tier) = &request.tier {
803 builder.add_tier_filter(tier);
804 }
805
806 if let Some(date_range) = &request.date_range {
807 builder.add_date_range(date_range.start.as_ref(), date_range.end.as_ref());
808 }
809
810 if let Some(importance_range) = &request.importance_range {
811 builder.add_importance_range(
812 importance_range.min.map(|v| v as f64),
813 importance_range.max.map(|v| v as f64),
814 );
815 }
816
817 Ok(())
818 }
819
820 async fn build_search_results(
821 &self,
822 rows: Vec<sqlx::postgres::PgRow>,
823 request: &SearchRequest,
824 ) -> Result<Vec<SearchResult>> {
825 let mut results = Vec::new();
826 let explain_score = request.explain_score.unwrap_or(false);
827
828 for row in rows {
829 let memory = Memory {
830 id: row.try_get("id")?,
831 content: row.try_get("content")?,
832 content_hash: row.try_get("content_hash")?,
833 embedding: row.try_get("embedding")?,
834 tier: row.try_get("tier")?,
835 status: row.try_get("status")?,
836 importance_score: row.try_get("importance_score")?,
837 access_count: row.try_get("access_count")?,
838 last_accessed_at: row.try_get("last_accessed_at")?,
839 metadata: row.try_get("metadata")?,
840 parent_id: row.try_get("parent_id")?,
841 created_at: row.try_get("created_at")?,
842 updated_at: row.try_get("updated_at")?,
843 expires_at: row.try_get("expires_at")?,
844 consolidation_strength: row.try_get("consolidation_strength").unwrap_or(1.0),
845 decay_rate: row.try_get("decay_rate").unwrap_or(1.0),
846 recall_probability: row.try_get("recall_probability")?,
847 last_recall_interval: row.try_get("last_recall_interval")?,
848 recency_score: row.try_get("recency_score").unwrap_or(0.0),
849 relevance_score: row.try_get("relevance_score").unwrap_or(0.0),
850 };
851
852 let similarity_score: f32 = row.try_get("similarity_score").unwrap_or(0.0);
853 let combined_score: f32 = row.try_get("combined_score").unwrap_or(similarity_score);
854 let temporal_score: Option<f32> = row.try_get("temporal_score").ok();
855 let access_frequency_score: Option<f32> = row.try_get("access_frequency_score").ok();
856 let importance_score = memory.importance_score; let score_explanation = if explain_score {
859 Some(ScoreExplanation {
860 semantic_contribution: similarity_score * 0.4,
861 temporal_contribution: temporal_score.unwrap_or(0.0) * 0.3,
862 importance_contribution: (importance_score * 0.2) as f32,
863 access_frequency_contribution: access_frequency_score.unwrap_or(0.0) * 0.1,
864 total_score: combined_score,
865 factors: vec![
866 "semantic similarity".to_string(),
867 "recency".to_string(),
868 "importance".to_string(),
869 ],
870 })
871 } else {
872 None
873 };
874
875 results.push(SearchResult {
876 memory,
877 similarity_score,
878 temporal_score,
879 importance_score,
880 access_frequency_score,
881 combined_score,
882 score_explanation,
883 });
884 }
885
886 debug!("Built {} search results", results.len());
887 Ok(results)
888 }
889
890 async fn count_search_results(&self, _request: &SearchRequest) -> Result<i64> {
891 let count: i64 =
893 sqlx::query_scalar("SELECT COUNT(*) FROM memories WHERE status = 'active'")
894 .fetch_one(&self.pool)
895 .await?;
896 Ok(count)
897 }
898
899 async fn generate_search_facets(&self, _request: &SearchRequest) -> Result<SearchFacets> {
900 let tier_rows: Vec<(String, i64)> = sqlx::query_as(
902 "SELECT tier, COUNT(*) FROM memories WHERE status = 'active' GROUP BY tier",
903 )
904 .fetch_all(&self.pool)
905 .await?;
906
907 let mut tiers = HashMap::new();
908 for (tier_str, count) in tier_rows {
909 if let Ok(tier) = tier_str.parse::<MemoryTier>() {
910 tiers.insert(tier, count);
911 }
912 }
913
914 let date_histogram = vec![DateBucket {
916 date: Utc::now(),
917 count: 10,
918 interval: "day".to_string(),
919 }];
920
921 let importance_ranges = vec![
923 ImportanceRange {
924 min: 0.0,
925 max: 0.3,
926 count: 5,
927 label: "Low".to_string(),
928 },
929 ImportanceRange {
930 min: 0.3,
931 max: 0.7,
932 count: 15,
933 label: "Medium".to_string(),
934 },
935 ImportanceRange {
936 min: 0.7,
937 max: 1.0,
938 count: 8,
939 label: "High".to_string(),
940 },
941 ];
942
943 Ok(SearchFacets {
944 tiers,
945 date_histogram,
946 importance_ranges,
947 tags: HashMap::new(), })
949 }
950
951 async fn generate_query_suggestions(&self, _request: &SearchRequest) -> Result<Vec<String>> {
952 Ok(vec![
954 "recent code changes".to_string(),
955 "function definitions".to_string(),
956 "error handling patterns".to_string(),
957 ])
958 }
959
960 fn generate_cursor(&self, offset: i64, _request: &SearchRequest) -> String {
961 use base64::{engine::general_purpose::STANDARD, Engine};
963 STANDARD.encode(format!("offset:{offset}"))
964 }
965
966 pub async fn search_memories_simple(
968 &self,
969 request: SearchRequest,
970 ) -> Result<Vec<SearchResult>> {
971 let response = self.search_memories(request).await?;
972 Ok(response.results)
973 }
974
975 pub async fn get_memories_by_tier(
976 &self,
977 tier: MemoryTier,
978 limit: Option<i64>,
979 ) -> Result<Vec<Memory>> {
980 let limit = limit.unwrap_or(100);
981
982 let memories = sqlx::query_as::<_, Memory>(
983 r#"
984 SELECT * FROM memories
985 WHERE tier = $1 AND status = 'active'
986 ORDER BY importance_score DESC, updated_at DESC
987 LIMIT $2
988 "#,
989 )
990 .bind(tier)
991 .bind(limit)
992 .fetch_all(&self.pool)
993 .await?;
994
995 Ok(memories)
996 }
997
998 pub async fn migrate_memory(
999 &self,
1000 id: Uuid,
1001 to_tier: MemoryTier,
1002 reason: Option<String>,
1003 ) -> Result<Memory> {
1004 let mut tx = self.pool.begin().await?;
1005
1006 let current = sqlx::query_as::<_, Memory>(
1008 "SELECT * FROM memories WHERE id = $1 AND status = 'active' FOR UPDATE",
1009 )
1010 .bind(id)
1011 .fetch_optional(&mut *tx)
1012 .await?
1013 .ok_or_else(|| MemoryError::NotFound { id: id.to_string() })?;
1014
1015 if current.tier == to_tier {
1016 return Ok(current);
1017 }
1018
1019 let valid_transition = match (current.tier, to_tier) {
1021 (MemoryTier::Working, MemoryTier::Warm)
1022 | (MemoryTier::Working, MemoryTier::Cold)
1023 | (MemoryTier::Warm, MemoryTier::Cold)
1024 | (MemoryTier::Warm, MemoryTier::Working)
1025 | (MemoryTier::Cold, MemoryTier::Warm) => true,
1026 _ => false,
1027 };
1028
1029 if !valid_transition {
1030 return Err(MemoryError::InvalidTierTransition {
1031 from: format!("{:?}", current.tier),
1032 to: format!("{to_tier:?}"),
1033 });
1034 }
1035
1036 let start = std::time::Instant::now();
1037
1038 let updated = sqlx::query_as::<_, Memory>(
1040 r#"
1041 UPDATE memories
1042 SET tier = $2, status = 'active', updated_at = NOW()
1043 WHERE id = $1
1044 RETURNING *
1045 "#,
1046 )
1047 .bind(id)
1048 .bind(to_tier)
1049 .fetch_one(&mut *tx)
1050 .await?;
1051
1052 let duration_ms = start.elapsed().as_millis() as i32;
1053
1054 self.record_migration(&mut tx, id, current.tier, to_tier, reason)
1056 .await?;
1057
1058 tx.commit().await?;
1059
1060 info!(
1061 "Migrated memory {} from {:?} to {:?} in {}ms",
1062 id, current.tier, to_tier, duration_ms
1063 );
1064
1065 Ok(updated)
1066 }
1067
1068 async fn record_migration(
1069 &self,
1070 tx: &mut Transaction<'_, Postgres>,
1071 memory_id: Uuid,
1072 from_tier: MemoryTier,
1073 to_tier: MemoryTier,
1074 reason: Option<String>,
1075 ) -> Result<()> {
1076 sqlx::query(
1077 r#"
1078 INSERT INTO migration_history (memory_id, from_tier, to_tier, migration_reason, success)
1079 VALUES ($1, $2, $3, $4, true)
1080 "#,
1081 )
1082 .bind(memory_id)
1083 .bind(from_tier)
1084 .bind(to_tier)
1085 .bind(reason)
1086 .execute(&mut **tx)
1087 .await?;
1088
1089 Ok(())
1090 }
1091
1092 pub async fn get_expired_memories(&self) -> Result<Vec<Memory>> {
1093 let memories = sqlx::query_as::<_, Memory>(
1094 r#"
1095 SELECT * FROM memories
1096 WHERE status = 'active'
1097 AND expires_at IS NOT NULL
1098 AND expires_at < NOW()
1099 "#,
1100 )
1101 .fetch_all(&self.pool)
1102 .await?;
1103
1104 Ok(memories)
1105 }
1106
1107 pub async fn cleanup_expired_memories(&self) -> Result<u64> {
1108 let result = sqlx::query(
1109 r#"
1110 UPDATE memories
1111 SET status = 'deleted'
1112 WHERE status = 'active'
1113 AND expires_at IS NOT NULL
1114 AND expires_at < NOW()
1115 "#,
1116 )
1117 .execute(&self.pool)
1118 .await?;
1119
1120 let count = result.rows_affected();
1121 if count > 0 {
1122 info!("Cleaned up {} expired memories", count);
1123 }
1124
1125 Ok(count)
1126 }
1127
1128 pub async fn get_migration_candidates(
1129 &self,
1130 tier: MemoryTier,
1131 limit: i64,
1132 ) -> Result<Vec<Memory>> {
1133 let query = match tier {
1134 MemoryTier::Working => {
1135 r#"
1136 SELECT * FROM memories
1137 WHERE tier = 'working'
1138 AND status = 'active'
1139 AND (
1140 importance_score < 0.3
1141 OR (last_accessed_at IS NOT NULL
1142 AND last_accessed_at < NOW() - INTERVAL '24 hours')
1143 )
1144 ORDER BY importance_score ASC, last_accessed_at ASC NULLS FIRST
1145 LIMIT $1
1146 "#
1147 }
1148 MemoryTier::Warm => {
1149 r#"
1150 SELECT * FROM memories
1151 WHERE tier = 'warm'
1152 AND status = 'active'
1153 AND importance_score < 0.1
1154 AND updated_at < NOW() - INTERVAL '7 days'
1155 ORDER BY importance_score ASC, updated_at ASC
1156 LIMIT $1
1157 "#
1158 }
1159 MemoryTier::Cold => {
1160 return Ok(Vec::new());
1161 }
1162 MemoryTier::Frozen => {
1163 return Ok(Vec::new()); }
1165 };
1166
1167 let memories = sqlx::query_as::<_, Memory>(query)
1168 .bind(limit)
1169 .fetch_all(&self.pool)
1170 .await?;
1171
1172 Ok(memories)
1173 }
1174
1175 pub async fn get_working_memory_pressure(&self) -> Result<f64> {
1177 if let Some(ref config) = self.config {
1178 let working_count: i64 = sqlx::query_scalar(
1179 "SELECT COUNT(*) FROM memories WHERE tier = 'working' AND status = 'active'",
1180 )
1181 .fetch_one(&self.pool)
1182 .await?;
1183
1184 let pressure = working_count as f64 / config.tier_config.working_tier_limit as f64;
1185 Ok(pressure.min(1.0))
1186 } else {
1187 Ok(0.0)
1188 }
1189 }
1190
1191 pub async fn get_statistics(&self) -> Result<MemoryStatistics> {
1192 let stats = sqlx::query_as::<_, MemoryStatistics>(
1193 r#"
1194 SELECT
1195 COUNT(*) FILTER (WHERE tier = 'working' AND status = 'active') as working_count,
1196 COUNT(*) FILTER (WHERE tier = 'warm' AND status = 'active') as warm_count,
1197 COUNT(*) FILTER (WHERE tier = 'cold' AND status = 'active') as cold_count,
1198 COUNT(*) FILTER (WHERE status = 'active') as total_active,
1199 COUNT(*) FILTER (WHERE status = 'deleted') as total_deleted,
1200 AVG(importance_score) FILTER (WHERE status = 'active') as avg_importance,
1201 MAX(access_count) FILTER (WHERE status = 'active') as max_access_count,
1202 CAST(AVG(access_count) FILTER (WHERE status = 'active') AS FLOAT8) as avg_access_count
1203 FROM memories
1204 "#,
1205 )
1206 .fetch_one(&self.pool)
1207 .await?;
1208
1209 Ok(stats)
1210 }
1211
1212 pub async fn get_consolidation_analytics(&self) -> Result<Vec<ConsolidationAnalytics>> {
1216 let analytics = sqlx::query_as::<_, ConsolidationAnalytics>(
1217 r#"
1218 SELECT
1219 tier,
1220 COUNT(*) as total_memories,
1221 AVG(consolidation_strength) as avg_consolidation_strength,
1222 AVG(recall_probability) as avg_recall_probability,
1223 AVG(decay_rate) as avg_decay_rate,
1224 AVG(EXTRACT(EPOCH FROM (NOW() - created_at)) / 86400) as avg_age_days,
1225 COUNT(*) FILTER (WHERE recall_probability < $1) as migration_candidates,
1226 COUNT(*) FILTER (WHERE last_accessed_at IS NULL) as never_accessed,
1227 COUNT(*) FILTER (WHERE last_accessed_at > NOW() - INTERVAL '24 hours') as accessed_recently
1228 FROM memories
1229 WHERE status = 'active'
1230 GROUP BY tier
1231 ORDER BY
1232 CASE tier
1233 WHEN 'working' THEN 1
1234 WHEN 'warm' THEN 2
1235 WHEN 'cold' THEN 3
1236 WHEN 'frozen' THEN 4
1237 END
1238 "#,
1239 )
1240 .bind(constants::FROZEN_MIGRATION_THRESHOLD)
1241 .fetch_all(&self.pool)
1242 .await?;
1243
1244 Ok(analytics)
1245 }
1246
1247 pub async fn find_migration_candidates(
1273 &self,
1274 tier: MemoryTier,
1275 limit: i32,
1276 ) -> Result<Vec<Memory>> {
1277 let threshold = match tier {
1278 MemoryTier::Working => 0.7,
1279 MemoryTier::Warm => 0.5,
1280 MemoryTier::Cold => 0.2,
1281 MemoryTier::Frozen => 0.0, };
1283
1284 let memories = sqlx::query_as::<_, Memory>(
1285 r#"
1286 SELECT * FROM memories
1287 WHERE tier = $1
1288 AND status = 'active'
1289 AND (recall_probability < $2 OR recall_probability IS NULL)
1290 ORDER BY recall_probability ASC NULLS LAST, consolidation_strength ASC
1291 LIMIT $3
1292 "#,
1293 )
1294 .bind(tier)
1295 .bind(threshold)
1296 .bind(limit)
1297 .fetch_all(&self.pool)
1298 .await?;
1299
1300 Ok(memories)
1301 }
1302
1303 pub async fn update_consolidation(
1305 &self,
1306 memory_id: Uuid,
1307 consolidation_strength: f64,
1308 decay_rate: f64,
1309 recall_probability: Option<f64>,
1310 ) -> Result<()> {
1311 sqlx::query(
1312 r#"
1313 UPDATE memories
1314 SET consolidation_strength = $2,
1315 decay_rate = $3,
1316 recall_probability = $4,
1317 updated_at = NOW()
1318 WHERE id = $1 AND status = 'active'
1319 "#,
1320 )
1321 .bind(memory_id)
1322 .bind(consolidation_strength)
1323 .bind(decay_rate)
1324 .bind(recall_probability)
1325 .execute(&self.pool)
1326 .await?;
1327
1328 Ok(())
1329 }
1330
1331 pub async fn log_consolidation_event(
1333 &self,
1334 memory_id: Uuid,
1335 event_type: &str,
1336 previous_strength: f64,
1337 new_strength: f64,
1338 previous_probability: Option<f64>,
1339 new_probability: Option<f64>,
1340 recall_interval: Option<PgInterval>,
1341 context: serde_json::Value,
1342 ) -> Result<()> {
1343 sqlx::query(
1344 r#"
1345 INSERT INTO memory_consolidation_log (
1346 memory_id, event_type, previous_consolidation_strength,
1347 new_consolidation_strength, previous_recall_probability,
1348 new_recall_probability, recall_interval, access_context
1349 )
1350 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
1351 "#,
1352 )
1353 .bind(memory_id)
1354 .bind(event_type)
1355 .bind(previous_strength)
1356 .bind(new_strength)
1357 .bind(previous_probability)
1358 .bind(new_probability)
1359 .bind(recall_interval)
1360 .bind(context)
1361 .execute(&self.pool)
1362 .await?;
1363
1364 Ok(())
1365 }
1366
1367 pub async fn freeze_memory(
1369 &self,
1370 memory_id: Uuid,
1371 reason: Option<String>,
1372 ) -> Result<FreezeMemoryResponse> {
1373 use super::compression::{FrozenMemoryCompression, ZstdCompressionEngine};
1374 use std::time::Instant;
1375
1376 let start_time = Instant::now();
1377 let mut tx = self.pool.begin().await?;
1378
1379 let memory = sqlx::query_as::<_, Memory>(
1381 "SELECT * FROM memories WHERE id = $1 AND status = 'active'",
1382 )
1383 .bind(memory_id)
1384 .fetch_optional(&mut *tx)
1385 .await?
1386 .ok_or_else(|| MemoryError::NotFound {
1387 id: memory_id.to_string(),
1388 })?;
1389
1390 if memory.tier != MemoryTier::Cold {
1392 return Err(MemoryError::InvalidRequest {
1393 message: format!(
1394 "Can only freeze memories in cold tier, found {:?}",
1395 memory.tier
1396 ),
1397 });
1398 }
1399
1400 let recall_probability = memory.recall_probability.unwrap_or(0.0);
1401 if recall_probability >= 0.2 {
1402 return Err(MemoryError::InvalidRequest {
1403 message: format!(
1404 "Can only freeze memories with P(r) < 0.2, found {recall_probability:.3}"
1405 ),
1406 });
1407 }
1408
1409 info!(
1410 "Freezing memory {} (P(r)={:.3}, content_length={})",
1411 memory_id,
1412 recall_probability,
1413 memory.content.len()
1414 );
1415
1416 let compression_engine = ZstdCompressionEngine::new();
1418 let compression_result =
1419 compression_engine.compress_memory_data(&memory.content, &memory.metadata)?;
1420
1421 FrozenMemoryCompression::validate_compression_quality(
1423 compression_result.compression_ratio,
1424 memory.content.len(),
1425 )?;
1426
1427 let (compressed_data, original_size, compressed_size, compression_ratio) =
1428 FrozenMemoryCompression::to_database_format(compression_result);
1429
1430 debug!(
1431 "Compression completed: {:.2}:1 ratio, {} -> {} bytes",
1432 compression_ratio, original_size, compressed_size
1433 );
1434
1435 let frozen_id = Uuid::new_v4();
1437 sqlx::query(
1438 r#"
1439 INSERT INTO frozen_memories (
1440 id, original_memory_id, compressed_content,
1441 original_metadata, original_content_hash, original_embedding,
1442 original_tier, freeze_reason, compression_ratio,
1443 original_size_bytes, compressed_size_bytes
1444 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
1445 "#,
1446 )
1447 .bind(frozen_id)
1448 .bind(memory.id)
1449 .bind(&compressed_data)
1450 .bind(&memory.metadata)
1451 .bind(&memory.content_hash)
1452 .bind(memory.embedding.as_ref())
1453 .bind(memory.tier)
1454 .bind(
1455 reason
1456 .as_deref()
1457 .unwrap_or("Auto-frozen: P(r) < 0.2 threshold"),
1458 )
1459 .bind(compression_ratio)
1460 .bind(original_size)
1461 .bind(compressed_size)
1462 .execute(&mut *tx)
1463 .await?;
1464
1465 sqlx::query(
1467 "UPDATE memories SET tier = 'frozen', status = 'archived', updated_at = NOW() WHERE id = $1"
1468 )
1469 .bind(memory_id)
1470 .execute(&mut *tx)
1471 .await?;
1472
1473 let processing_time_ms = start_time.elapsed().as_millis() as i32;
1475 sqlx::query(
1476 r#"
1477 INSERT INTO migration_history (
1478 memory_id, from_tier, to_tier, migration_reason,
1479 migration_duration_ms, success
1480 ) VALUES ($1, $2, 'frozen', $3, $4, true)
1481 "#,
1482 )
1483 .bind(memory_id)
1484 .bind(memory.tier)
1485 .bind(format!("Frozen with {compression_ratio:.2}:1 compression"))
1486 .bind(processing_time_ms)
1487 .execute(&mut *tx)
1488 .await?;
1489
1490 tx.commit().await?;
1491
1492 info!(
1493 "Successfully froze memory {} with {:.2}:1 compression in {}ms",
1494 memory_id, compression_ratio, processing_time_ms
1495 );
1496
1497 Ok(FreezeMemoryResponse {
1498 frozen_id,
1499 compression_ratio: Some(compression_ratio),
1500 original_tier: memory.tier,
1501 frozen_at: Utc::now(),
1502 })
1503 }
1504
1505 pub async fn unfreeze_memory(
1507 &self,
1508 frozen_id: Uuid,
1509 target_tier: Option<MemoryTier>,
1510 ) -> Result<UnfreezeMemoryResponse> {
1511 use super::compression::ZstdCompressionEngine;
1512 use rand::Rng;
1513 use std::time::Instant;
1514 use tokio::time::{sleep, Duration};
1515
1516 let start_time = Instant::now();
1517 let mut tx = self.pool.begin().await?;
1518
1519 let frozen_memory =
1521 sqlx::query_as::<_, FrozenMemory>("SELECT * FROM frozen_memories WHERE id = $1")
1522 .bind(frozen_id)
1523 .fetch_optional(&mut *tx)
1524 .await?
1525 .ok_or_else(|| MemoryError::NotFound {
1526 id: frozen_id.to_string(),
1527 })?;
1528
1529 info!(
1530 "Unfreezing memory {} (compression_ratio: {:.2}:1)",
1531 frozen_id,
1532 frozen_memory.compression_ratio.unwrap_or(0.0)
1533 );
1534
1535 let mut rng = rand::thread_rng();
1537 let delay_seconds = rng.gen_range(2..=5);
1538
1539 info!(
1540 "Applying {}-second intentional delay for frozen tier retrieval",
1541 delay_seconds
1542 );
1543 sleep(Duration::from_secs(delay_seconds)).await;
1544
1545 let compression_engine = ZstdCompressionEngine::new();
1547
1548 let compressed_data = match &frozen_memory.compressed_content {
1551 serde_json::Value::String(base64_data) => {
1552 BASE64_STANDARD
1554 .decode(base64_data.as_bytes())
1555 .map_err(|e| MemoryError::DecompressionError {
1556 message: format!("Failed to decode base64 compressed data: {e}"),
1557 })?
1558 }
1559 serde_json::Value::Array(byte_array) => {
1560 byte_array
1562 .iter()
1563 .map(|v| v.as_u64().unwrap_or(0) as u8)
1564 .collect()
1565 }
1566 _ => {
1567 return Err(MemoryError::DecompressionError {
1569 message: "Invalid compressed data format in database".to_string(),
1570 });
1571 }
1572 };
1573
1574 let decompressed_data = compression_engine.decompress_memory_data(&compressed_data)?;
1575
1576 debug!(
1577 "Decompression completed: restored {} bytes of content",
1578 decompressed_data.content.len()
1579 );
1580
1581 let restoration_tier = target_tier
1583 .or(Some(frozen_memory.original_tier))
1584 .unwrap_or(MemoryTier::Working);
1585
1586 let memory_id = frozen_memory.original_memory_id;
1588 let rows_affected = sqlx::query(
1589 r#"
1590 UPDATE memories
1591 SET
1592 content = $1,
1593 tier = $2,
1594 status = 'active',
1595 metadata = $3,
1596 updated_at = NOW()
1597 WHERE id = $4
1598 "#,
1599 )
1600 .bind(&decompressed_data.content)
1601 .bind(restoration_tier)
1602 .bind(&decompressed_data.metadata)
1603 .bind(memory_id)
1604 .execute(&mut *tx)
1605 .await?
1606 .rows_affected();
1607
1608 if rows_affected == 0 {
1609 sqlx::query(
1611 r#"
1612 INSERT INTO memories (
1613 id, content, content_hash, embedding, tier, status,
1614 importance_score, metadata, created_at, updated_at
1615 ) VALUES ($1, $2, $3, $4, $5, 'active', 0.5, $6, NOW(), NOW())
1616 "#,
1617 )
1618 .bind(memory_id)
1619 .bind(&decompressed_data.content)
1620 .bind(&frozen_memory.original_content_hash)
1621 .bind(frozen_memory.original_embedding.as_ref())
1622 .bind(restoration_tier)
1623 .bind(&decompressed_data.metadata)
1624 .execute(&mut *tx)
1625 .await?;
1626
1627 info!("Recreated deleted memory {} during unfreeze", memory_id);
1628 }
1629
1630 sqlx::query(
1632 r#"
1633 UPDATE frozen_memories
1634 SET
1635 unfreeze_count = COALESCE(unfreeze_count, 0) + 1,
1636 last_unfrozen_at = NOW(),
1637 updated_at = NOW()
1638 WHERE id = $1
1639 "#,
1640 )
1641 .bind(frozen_id)
1642 .execute(&mut *tx)
1643 .await?;
1644
1645 let processing_time_ms = start_time.elapsed().as_millis() as i32;
1647 sqlx::query(
1648 r#"
1649 INSERT INTO migration_history (
1650 memory_id, from_tier, to_tier, migration_reason,
1651 migration_duration_ms, success
1652 ) VALUES ($1, 'frozen', $2, $3, $4, true)
1653 "#,
1654 )
1655 .bind(memory_id)
1656 .bind(restoration_tier)
1657 .bind(format!("Unfrozen after {delay_seconds} second delay"))
1658 .bind(processing_time_ms)
1659 .execute(&mut *tx)
1660 .await?;
1661
1662 tx.commit().await?;
1663
1664 info!(
1665 "Successfully unfroze memory {} to {:?} tier in {}ms (including {}s delay)",
1666 memory_id, restoration_tier, processing_time_ms, delay_seconds
1667 );
1668
1669 Ok(UnfreezeMemoryResponse {
1670 memory_id,
1671 retrieval_delay_seconds: delay_seconds as i32,
1672 restoration_tier,
1673 unfrozen_at: Utc::now(),
1674 })
1675 }
1676
1677 pub async fn get_frozen_memories(&self, limit: i32, offset: i64) -> Result<Vec<FrozenMemory>> {
1679 let frozen_memories = sqlx::query_as::<_, FrozenMemory>(
1680 r#"
1681 SELECT * FROM frozen_memories
1682 ORDER BY frozen_at DESC
1683 LIMIT $1 OFFSET $2
1684 "#,
1685 )
1686 .bind(limit)
1687 .bind(offset)
1688 .fetch_all(&self.pool)
1689 .await?;
1690
1691 Ok(frozen_memories)
1692 }
1693
1694 pub async fn search_frozen_memories(
1696 &self,
1697 query: &str,
1698 limit: i32,
1699 ) -> Result<Vec<FrozenMemory>> {
1700 let frozen_memories = sqlx::query_as::<_, FrozenMemory>(
1701 r#"
1702 SELECT * FROM frozen_memories
1703 WHERE
1704 convert_from(compressed_content, 'UTF8') ILIKE $1
1705 OR freeze_reason ILIKE $1
1706 ORDER BY frozen_at DESC
1707 LIMIT $2
1708 "#,
1709 )
1710 .bind(format!("%{query}%"))
1711 .bind(limit)
1712 .fetch_all(&self.pool)
1713 .await?;
1714
1715 Ok(frozen_memories)
1716 }
1717
1718 pub async fn get_tier_statistics(&self) -> Result<Vec<MemoryTierStatistics>> {
1720 let stats = sqlx::query_as::<_, MemoryTierStatistics>(
1721 r#"
1722 SELECT * FROM memory_tier_statistics
1723 WHERE snapshot_timestamp > NOW() - INTERVAL '24 hours'
1724 ORDER BY snapshot_timestamp DESC, tier
1725 "#,
1726 )
1727 .fetch_all(&self.pool)
1728 .await?;
1729
1730 Ok(stats)
1731 }
1732
1733 pub async fn update_tier_statistics(&self) -> Result<()> {
1735 sqlx::query("SELECT update_tier_statistics()")
1736 .execute(&self.pool)
1737 .await?;
1738
1739 Ok(())
1740 }
1741
1742 pub async fn search_by_consolidation(
1744 &self,
1745 request: ConsolidationSearchRequest,
1746 ) -> Result<Vec<Memory>> {
1747 let limit = request.limit.unwrap_or(10);
1748 let offset = request.offset.unwrap_or(0);
1749
1750 let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
1752
1753 builder.add_consolidation_strength_range(
1755 request.min_consolidation_strength,
1756 request.max_consolidation_strength,
1757 );
1758
1759 builder.add_recall_probability_range(
1761 request.min_recall_probability,
1762 request.max_recall_probability,
1763 );
1764
1765 if let Some(tier) = &request.tier {
1767 builder.add_tier_filter(tier);
1768 }
1769
1770 builder.add_exclude_frozen(!request.include_frozen.unwrap_or(false));
1772
1773 builder.add_condition(
1775 "ORDER BY consolidation_strength DESC, recall_probability DESC NULLS LAST",
1776 );
1777 builder.add_pagination(limit as usize, offset as usize)?;
1778
1779 let query = builder.build_query();
1781 let sqlx_query = builder.bind_parameters_as(sqlx::query_as::<_, Memory>(&query));
1782
1783 let memories = sqlx_query.fetch_all(&self.pool).await?;
1784 Ok(memories)
1785 }
1786
1787 pub async fn update_memory_scores(
1789 &self,
1790 memory_id: Uuid,
1791 recency_score: f64,
1792 relevance_score: f64,
1793 ) -> Result<()> {
1794 sqlx::query(
1795 r#"
1796 UPDATE memories
1797 SET recency_score = $2,
1798 relevance_score = $3,
1799 updated_at = NOW()
1800 WHERE id = $1 AND status = 'active'
1801 "#,
1802 )
1803 .bind(memory_id)
1804 .bind(recency_score)
1805 .bind(relevance_score)
1806 .execute(&self.pool)
1807 .await?;
1808
1809 Ok(())
1810 }
1811
1812 pub async fn batch_update_three_component_scores(&self) -> Result<i64> {
1814 let start_time = Instant::now();
1815
1816 let result = sqlx::query(
1817 r#"
1818 UPDATE memories
1819 SET recency_score = calculate_recency_score(last_accessed_at, created_at, 0.005),
1820 relevance_score = LEAST(1.0,
1821 0.5 * importance_score +
1822 0.3 * LEAST(1.0, access_count / 100.0) +
1823 0.2
1824 ),
1825 updated_at = NOW()
1826 WHERE status = 'active'
1827 "#,
1828 )
1829 .execute(&self.pool)
1830 .await?;
1831
1832 let duration = start_time.elapsed();
1833 info!(
1834 "Updated three-component scores for {} memories in {:?}",
1835 result.rows_affected(),
1836 duration
1837 );
1838
1839 Ok(result.rows_affected() as i64)
1840 }
1841
1842 pub async fn get_memories_by_combined_score(
1844 &self,
1845 tier: Option<MemoryTier>,
1846 limit: Option<i32>,
1847 recency_weight: Option<f64>,
1848 importance_weight: Option<f64>,
1849 relevance_weight: Option<f64>,
1850 ) -> Result<Vec<Memory>> {
1851 let limit = limit.unwrap_or(50);
1852
1853 if recency_weight.is_some() || importance_weight.is_some() || relevance_weight.is_some() {
1857 warn!(
1858 "Custom weights not supported with generated combined_score column. Using fixed weights: 0.333, 0.333, 0.334"
1859 );
1860 }
1861
1862 let query = if let Some(tier) = tier {
1863 sqlx::query_as::<_, Memory>(
1864 r#"
1865 SELECT m.*
1866 FROM memories m
1867 WHERE m.status = 'active'
1868 AND m.tier = $1
1869 ORDER BY m.combined_score DESC, m.updated_at DESC
1870 LIMIT $2
1871 "#,
1872 )
1873 .bind(tier)
1874 .bind(limit as i64)
1875 } else {
1876 sqlx::query_as::<_, Memory>(
1877 r#"
1878 SELECT m.*
1879 FROM memories m
1880 WHERE m.status = 'active'
1881 ORDER BY m.combined_score DESC, m.updated_at DESC
1882 LIMIT $1
1883 "#,
1884 )
1885 .bind(limit as i64)
1886 };
1887
1888 let memories = query.fetch_all(&self.pool).await?;
1889
1890 debug!(
1891 "Retrieved {} memories ranked by generated combined_score for tier {:?}",
1892 memories.len(),
1893 tier
1894 );
1895
1896 Ok(memories)
1897 }
1898
1899 pub async fn get_memories_for_consolidation(
1903 &self,
1904 tier: Option<MemoryTier>,
1905 batch_size: usize,
1906 min_hours_since_last_processing: f64,
1907 ) -> Result<Vec<Memory>> {
1908 let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
1910
1911 builder.add_last_access_interval(min_hours_since_last_processing);
1913
1914 if let Some(tier) = tier {
1916 builder.add_tier_filter(&tier);
1917 }
1918
1919 let threshold_bind_index = builder.bind_index;
1921 builder.add_recall_threshold_condition(constants::COLD_MIGRATION_THRESHOLD);
1922
1923 let order_condition = format!(
1924 "ORDER BY CASE WHEN recall_probability IS NULL THEN 1 WHEN recall_probability < ${threshold_bind_index} THEN 2 ELSE 3 END, last_accessed_at ASC NULLS FIRST, consolidation_strength ASC"
1925 );
1926 builder.add_condition(&order_condition);
1927
1928 builder.add_pagination(batch_size, 0)?;
1930
1931 let query = builder.build_query();
1933 let sqlx_query = builder.bind_parameters_as(sqlx::query_as::<_, Memory>(&query));
1934
1935 let memories = sqlx_query.fetch_all(&self.pool).await?;
1936 Ok(memories)
1937 }
1938
1939 pub async fn batch_update_consolidation(
1941 &self,
1942 updates: &[(Uuid, f64, f64)], ) -> Result<usize> {
1944 if updates.is_empty() {
1945 return Ok(0);
1946 }
1947
1948 let mut tx = self.pool.begin().await?;
1949 let mut updated_count = 0;
1950
1951 for (memory_id, new_strength, recall_prob) in updates {
1952 let result = sqlx::query(
1953 r#"
1954 UPDATE memories
1955 SET consolidation_strength = $1,
1956 recall_probability = $2,
1957 updated_at = NOW()
1958 WHERE id = $3 AND status = 'active'
1959 "#,
1960 )
1961 .bind(new_strength)
1962 .bind(recall_prob)
1963 .bind(memory_id)
1964 .execute(&mut *tx)
1965 .await?;
1966
1967 updated_count += result.rows_affected() as usize;
1968 }
1969
1970 tx.commit().await?;
1971 Ok(updated_count)
1972 }
1973
1974 pub async fn batch_migrate_memories(
1976 &self,
1977 migrations: &[(Uuid, MemoryTier)], ) -> Result<usize> {
1979 if migrations.is_empty() {
1980 return Ok(0);
1981 }
1982
1983 let mut tx = self.pool.begin().await?;
1984 let mut migrated_count = 0;
1985
1986 for (memory_id, target_tier) in migrations {
1987 let current_memory: Option<(MemoryTier,)> =
1989 sqlx::query_as("SELECT tier FROM memories WHERE id = $1 AND status = 'active'")
1990 .bind(memory_id)
1991 .fetch_optional(&mut *tx)
1992 .await?;
1993
1994 if let Some((current_tier,)) = current_memory {
1995 let result = sqlx::query(
1997 r#"
1998 UPDATE memories
1999 SET tier = $1, updated_at = NOW()
2000 WHERE id = $2 AND status = 'active'
2001 "#,
2002 )
2003 .bind(target_tier)
2004 .bind(memory_id)
2005 .execute(&mut *tx)
2006 .await?;
2007
2008 if result.rows_affected() > 0 {
2009 migrated_count += 1;
2010
2011 self.record_migration(
2013 &mut tx,
2014 *memory_id,
2015 current_tier,
2016 *target_tier,
2017 Some("Simple consolidation automatic migration".to_string()),
2018 )
2019 .await?;
2020 }
2021 }
2022 }
2023
2024 tx.commit().await?;
2025 Ok(migrated_count)
2026 }
2027
2028 pub async fn get_simple_consolidation_candidates(
2030 &self,
2031 tier: Option<MemoryTier>,
2032 threshold: f64,
2033 limit: usize,
2034 ) -> Result<Vec<Memory>> {
2035 let mut builder = SafeQueryBuilder::new(
2037 "SELECT * FROM memories WHERE status = 'active' AND (recall_probability < $1 OR recall_probability IS NULL)"
2038 );
2039
2040 if let Some(tier) = tier {
2042 builder.add_tier_filter(&tier);
2043 }
2044
2045 builder.add_condition("ORDER BY COALESCE(recall_probability, 0) ASC, consolidation_strength ASC, last_accessed_at ASC NULLS FIRST");
2047
2048 builder.add_pagination(limit, 0)?;
2050
2051 let query = builder.build_query();
2053 let mut sqlx_query = sqlx::query_as::<_, Memory>(&query).bind(threshold);
2054 sqlx_query = builder.bind_parameters_as(sqlx_query);
2055
2056 let memories = sqlx_query.fetch_all(&self.pool).await?;
2057 Ok(memories)
2058 }
2059
2060 pub async fn log_simple_consolidation_event(
2062 &self,
2063 memory_id: Uuid,
2064 previous_strength: f64,
2065 new_strength: f64,
2066 previous_probability: Option<f64>,
2067 new_probability: f64,
2068 processing_time_ms: u64,
2069 ) -> Result<()> {
2070 let context = serde_json::json!({
2071 "engine": "simple_consolidation",
2072 "processing_time_ms": processing_time_ms,
2073 "strength_delta": new_strength - previous_strength,
2074 "probability_delta": new_probability - previous_probability.unwrap_or(0.0)
2075 });
2076
2077 self.log_consolidation_event(
2078 memory_id,
2079 "simple_consolidation",
2080 previous_strength,
2081 new_strength,
2082 previous_probability,
2083 Some(new_probability),
2084 None, context,
2086 )
2087 .await
2088 }
2089
2090 pub async fn get_simple_consolidation_stats(&self) -> Result<SimpleConsolidationStats> {
2092 let stats = sqlx::query_as::<_, SimpleConsolidationStats>(
2093 r#"
2094 SELECT
2095 COUNT(*) FILTER (WHERE recall_probability < $1) as migration_candidates,
2096 COUNT(*) FILTER (WHERE consolidation_strength > 5.0) as highly_consolidated,
2097 AVG(consolidation_strength) as avg_consolidation_strength,
2098 AVG(recall_probability) as avg_recall_probability,
2099 COUNT(*) FILTER (WHERE last_accessed_at > NOW() - INTERVAL '24 hours') as recently_accessed,
2100 COUNT(*) as total_active_memories
2101 FROM memories
2102 WHERE status = 'active'
2103 "#,
2104 )
2105 .bind(constants::COLD_MIGRATION_THRESHOLD)
2106 .fetch_one(&self.pool)
2107 .await?;
2108
2109 Ok(stats)
2110 }
2111
2112 pub async fn get_trigger_metrics(&self) -> Option<super::event_triggers::TriggerMetrics> {
2114 if let Some(trigger_engine) = &self.trigger_engine {
2115 Some(trigger_engine.get_metrics().await)
2116 } else {
2117 None
2118 }
2119 }
2120
2121 pub async fn reset_trigger_metrics(&self) -> Result<()> {
2123 if let Some(trigger_engine) = &self.trigger_engine {
2124 trigger_engine.reset_metrics().await?;
2125 }
2126 Ok(())
2127 }
2128
2129 pub async fn add_user_trigger_customization(
2131 &self,
2132 user_id: String,
2133 customizations: std::collections::HashMap<
2134 super::event_triggers::TriggerEvent,
2135 super::event_triggers::TriggerPattern,
2136 >,
2137 ) -> Result<()> {
2138 if let Some(trigger_engine) = &self.trigger_engine {
2139 trigger_engine
2140 .add_user_customization(user_id, customizations)
2141 .await?;
2142 }
2143 Ok(())
2144 }
2145
2146 pub fn has_trigger_engine(&self) -> bool {
2148 self.trigger_engine.is_some()
2149 }
2150
2151 pub async fn batch_freeze_by_recall_probability(
2153 &self,
2154 max_batch_size: Option<usize>,
2155 ) -> Result<BatchFreezeResult> {
2156 use std::time::Instant;
2157
2158 let start_time = Instant::now();
2159 let batch_size = max_batch_size.unwrap_or(100_000); let candidates = sqlx::query_as::<_, Memory>(
2163 r#"
2164 SELECT * FROM memories
2165 WHERE tier = 'cold'
2166 AND status = 'active'
2167 AND COALESCE(recall_probability, 0) < 0.2
2168 ORDER BY recall_probability ASC, last_accessed_at ASC
2169 LIMIT $1
2170 "#,
2171 )
2172 .bind(batch_size as i64)
2173 .fetch_all(&self.pool)
2174 .await?;
2175
2176 let mut frozen_ids = Vec::new();
2177 let mut total_space_saved = 0u64;
2178 let mut compression_ratios = Vec::new();
2179
2180 info!("Starting batch freeze of {} memories", candidates.len());
2181
2182 for chunk in candidates.chunks(1000) {
2184 let mut tx = self.pool.begin().await?;
2185
2186 for memory in chunk {
2187 match sqlx::query("SELECT freeze_memory($1) as frozen_id")
2189 .bind(memory.id)
2190 .fetch_one(&mut *tx)
2191 .await
2192 {
2193 Ok(row) => {
2194 let frozen_id: Uuid = row.get("frozen_id");
2195 frozen_ids.push(frozen_id);
2196
2197 let original_size = memory.content.len() as u64;
2199 let estimated_compressed_size = original_size / 6; total_space_saved += original_size - estimated_compressed_size;
2201 compression_ratios.push(6.0);
2202 }
2203 Err(e) => {
2204 warn!("Failed to freeze memory {}: {}", memory.id, e);
2205 continue;
2206 }
2207 }
2208 }
2209
2210 tx.commit().await?;
2211 }
2212
2213 let processing_time = start_time.elapsed();
2214 let avg_compression_ratio = if !compression_ratios.is_empty() {
2215 compression_ratios.iter().sum::<f32>() / compression_ratios.len() as f32
2216 } else {
2217 0.0
2218 };
2219
2220 info!(
2221 "Batch freeze completed: {} memories frozen in {:?}, avg compression: {:.1}:1",
2222 frozen_ids.len(),
2223 processing_time,
2224 avg_compression_ratio
2225 );
2226
2227 Ok(BatchFreezeResult {
2228 memories_frozen: frozen_ids.len() as u32,
2229 total_space_saved_bytes: total_space_saved,
2230 average_compression_ratio: avg_compression_ratio,
2231 processing_time_ms: processing_time.as_millis() as u64,
2232 frozen_memory_ids: frozen_ids,
2233 })
2234 }
2235
2236 pub async fn batch_unfreeze_memories(
2238 &self,
2239 frozen_ids: Vec<Uuid>,
2240 target_tier: Option<MemoryTier>,
2241 ) -> Result<BatchUnfreezeResult> {
2242 use std::time::Instant;
2243
2244 let start_time = Instant::now();
2245 let mut unfrozen_memory_ids = Vec::new();
2246 let mut total_delay_seconds = 0i32;
2247
2248 info!("Starting batch unfreeze of {} memories", frozen_ids.len());
2249
2250 for chunk in frozen_ids.chunks(100) {
2252 for frozen_id in chunk {
2253 match self.unfreeze_memory(*frozen_id, target_tier).await {
2254 Ok(response) => {
2255 unfrozen_memory_ids.push(response.memory_id);
2256 total_delay_seconds += response.retrieval_delay_seconds;
2257 }
2258 Err(e) => {
2259 warn!("Failed to unfreeze memory {}: {}", frozen_id, e);
2260 continue;
2261 }
2262 }
2263 }
2264 }
2265
2266 let processing_time = start_time.elapsed();
2267 let avg_delay_seconds = if !unfrozen_memory_ids.is_empty() {
2268 total_delay_seconds as f32 / unfrozen_memory_ids.len() as f32
2269 } else {
2270 0.0
2271 };
2272
2273 info!(
2274 "Batch unfreeze completed: {} memories unfrozen in {:?}, avg delay: {:.1}s",
2275 unfrozen_memory_ids.len(),
2276 processing_time,
2277 avg_delay_seconds
2278 );
2279
2280 Ok(BatchUnfreezeResult {
2281 memories_unfrozen: unfrozen_memory_ids.len() as u32,
2282 total_processing_time_ms: processing_time.as_millis() as u64,
2283 average_delay_seconds: avg_delay_seconds,
2284 unfrozen_memory_ids,
2285 })
2286 }
2287
2288 pub async fn get_memories_for_forgetting(
2905 &self,
2906 tier: MemoryTier,
2907 batch_size: usize,
2908 ) -> Result<Vec<Memory>> {
2909 let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
2910
2911 builder.add_tier_filter(&tier);
2913
2914 builder.add_condition("AND (updated_at IS NULL OR updated_at < NOW() - INTERVAL '1 hour')");
2916
2917 builder.add_condition("ORDER BY updated_at ASC NULLS FIRST");
2919
2920 builder.add_pagination(batch_size, 0)?;
2922
2923 let query = builder.build_query();
2924 let query_with_params = builder.bind_parameters_as(sqlx::query_as::<_, Memory>(&query));
2925
2926 let memories = query_with_params.fetch_all(&self.pool).await?;
2927 Ok(memories)
2928 }
2929
2930 pub async fn batch_update_decay_rates(
2932 &self,
2933 updates: &[(Uuid, f64)], ) -> Result<usize> {
2935 if updates.is_empty() {
2936 return Ok(0);
2937 }
2938
2939 let mut tx = self.pool.begin().await?;
2940 let mut updated_count = 0;
2941
2942 for (memory_id, new_decay_rate) in updates {
2943 let result = sqlx::query(
2946 "UPDATE memories SET decay_rate = $1, updated_at = NOW() WHERE id = $2 AND status = 'active'"
2947 )
2948 .bind(new_decay_rate)
2949 .bind(memory_id)
2950 .execute(&mut *tx)
2951 .await?;
2952
2953 updated_count += result.rows_affected() as usize;
2954 }
2955
2956 tx.commit().await?;
2957 Ok(updated_count)
2958 }
2959
2960 pub async fn batch_update_importance_scores(
2962 &self,
2963 updates: &[(Uuid, f64)], ) -> Result<usize> {
2965 if updates.is_empty() {
2966 return Ok(0);
2967 }
2968
2969 let mut tx = self.pool.begin().await?;
2970 let mut updated_count = 0;
2971
2972 for (memory_id, new_importance_score) in updates {
2973 let result = sqlx::query(
2976 "UPDATE memories SET importance_score = $1, updated_at = NOW() WHERE id = $2 AND status = 'active'"
2977 )
2978 .bind(new_importance_score)
2979 .bind(memory_id)
2980 .execute(&mut *tx)
2981 .await?;
2982
2983 updated_count += result.rows_affected() as usize;
2984 }
2985
2986 tx.commit().await?;
2987 Ok(updated_count)
2988 }
2989
2990 pub async fn batch_soft_delete_memories(&self, memory_ids: &[Uuid]) -> Result<usize> {
2992 if memory_ids.is_empty() {
2993 return Ok(0);
2994 }
2995
2996 let mut tx = self.pool.begin().await?;
2997 let mut deleted_count = 0;
2998
2999 for memory_id in memory_ids {
3000 match sqlx::query(
3003 "UPDATE memories SET status = 'deleted', updated_at = NOW() WHERE id = $1 AND status = 'active'"
3004 )
3005 .bind(memory_id)
3006 .execute(&mut *tx)
3007 .await
3008 {
3009 Ok(result) => {
3010 deleted_count += result.rows_affected() as usize;
3011 }
3012 Err(e) => {
3013 warn!("Failed to soft delete memory {}: {}", memory_id, e);
3014 }
3015 }
3016 }
3017
3018 tx.commit().await?;
3019 Ok(deleted_count)
3020 }
3021}
3022
3023#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize, serde::Deserialize)]
3024pub struct MemoryStatistics {
3025 pub working_count: Option<i64>,
3026 pub warm_count: Option<i64>,
3027 pub cold_count: Option<i64>,
3028 pub total_active: Option<i64>,
3029 pub total_deleted: Option<i64>,
3030 pub avg_importance: Option<f64>,
3031 pub max_access_count: Option<i32>,
3032 pub avg_access_count: Option<f64>,
3033}
3034
3035#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize, serde::Deserialize)]
3036pub struct SimpleConsolidationStats {
3037 pub migration_candidates: Option<i64>,
3038 pub highly_consolidated: Option<i64>,
3039 pub avg_consolidation_strength: Option<f64>,
3040 pub avg_recall_probability: Option<f64>,
3041 pub recently_accessed: Option<i64>,
3042 pub total_active_memories: Option<i64>,
3043}
3044
3045#[cfg(test)]
3046mod tests {
3047 use super::*;
3048
3049 #[test]
3050 fn test_content_hash_generation() {
3051 let content = "This is a test memory content";
3052 let hash1 = Memory::calculate_content_hash(content);
3053 let hash2 = Memory::calculate_content_hash(content);
3054
3055 assert_eq!(hash1, hash2);
3056 assert_eq!(hash1.len(), 64); }
3058
3059 #[test]
3060 fn test_should_migrate() {
3061 let mut memory = Memory::default();
3062
3063 memory.tier = MemoryTier::Working;
3065 memory.importance_score = 0.01;
3066 memory.consolidation_strength = 0.1;
3067 memory.access_count = 0;
3068 memory.last_accessed_at = Some(Utc::now() - chrono::Duration::days(30)); assert!(memory.should_migrate());
3070
3071 memory.importance_score = 0.9;
3073 memory.consolidation_strength = 8.0;
3074 memory.access_count = 100;
3075 memory.last_accessed_at = Some(Utc::now()); assert!(!memory.should_migrate());
3077
3078 memory.tier = MemoryTier::Cold;
3081 memory.importance_score = 0.0;
3082 memory.last_accessed_at = Some(Utc::now() - chrono::Duration::days(30)); memory.tier = MemoryTier::Frozen;
3088 assert!(!memory.should_migrate());
3089 }
3090
3091 #[test]
3092 fn test_next_tier() {
3093 let mut memory = Memory::default();
3094
3095 memory.tier = MemoryTier::Working;
3096 assert_eq!(memory.next_tier(), Some(MemoryTier::Warm));
3097
3098 memory.tier = MemoryTier::Warm;
3099 assert_eq!(memory.next_tier(), Some(MemoryTier::Cold));
3100
3101 memory.tier = MemoryTier::Cold;
3102 assert_eq!(memory.next_tier(), Some(MemoryTier::Frozen));
3103
3104 memory.tier = MemoryTier::Frozen;
3105 assert_eq!(memory.next_tier(), None);
3106 }
3107
3108 #[test]
3109 fn test_safe_query_builder_pagination_validation() {
3110 let mut builder = SafeQueryBuilder::new("SELECT * FROM memories");
3111
3112 assert!(builder.add_pagination(100, 50).is_ok());
3114
3115 let mut builder2 = SafeQueryBuilder::new("SELECT * FROM memories");
3117 assert!(builder2.add_pagination(20000, 0).is_err());
3118
3119 let mut builder3 = SafeQueryBuilder::new("SELECT * FROM memories");
3121 assert!(builder3.add_pagination(10, 2000000).is_err());
3122 }
3123
3124 #[test]
3125 fn test_safe_query_builder_parameterization() {
3126 let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
3127
3128 let tier = MemoryTier::Working;
3130 builder.add_tier_filter(&tier);
3131
3132 builder.add_importance_range(Some(0.5), Some(0.9));
3134
3135 builder.add_pagination(10, 0).unwrap();
3137
3138 let query = builder.build_query();
3139
3140 assert!(query.contains("$1"));
3142 assert!(query.contains("$2"));
3143 assert!(query.contains("$3"));
3144 assert!(query.contains("$4"));
3145 assert!(query.contains("$5"));
3146
3147 assert!(!query.contains("0.5"));
3149 assert!(!query.contains("0.9"));
3150 assert!(!query.contains("Working"));
3151 }
3152
3153 #[test]
3154 fn test_safe_query_builder_sql_injection_prevention() {
3155 let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
3156
3157 let tier = MemoryTier::Working;
3159 builder.add_tier_filter(&tier);
3160
3161 builder.add_importance_range(Some(0.1), Some(1.0));
3163
3164 let query = builder.build_query();
3165
3166 assert!(query.contains("tier = $"));
3168 assert!(query.contains("importance_score >= $"));
3169 assert!(query.contains("importance_score <= $"));
3170
3171 assert!(!query.contains("'; DROP TABLE"));
3173 assert!(!query.contains("OR 1=1"));
3174 assert!(!query.contains("UNION SELECT"));
3175 }
3176
3177 #[test]
3178 fn test_consolidation_strength_range_parameterization() {
3179 let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
3180
3181 builder.add_consolidation_strength_range(Some(1.0), Some(5.0));
3182 builder.add_recall_probability_range(Some(0.1), Some(0.9));
3183
3184 let query = builder.build_query();
3185
3186 assert!(query.contains("consolidation_strength >= $"));
3188 assert!(query.contains("consolidation_strength <= $"));
3189 assert!(query.contains("recall_probability >= $"));
3190 assert!(query.contains("recall_probability <= $"));
3191
3192 assert!(!query.contains("1.0"));
3194 assert!(!query.contains("5.0"));
3195 assert!(!query.contains("0.1"));
3196 assert!(!query.contains("0.9"));
3197 }
3198
3199 #[test]
3200 fn test_date_range_parameterization() {
3201 let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
3202
3203 let start_date = Utc::now() - chrono::Duration::days(30);
3204 let end_date = Utc::now();
3205
3206 builder.add_date_range(Some(&start_date), Some(&end_date));
3207
3208 let query = builder.build_query();
3209
3210 assert!(query.contains("created_at >= $"));
3212 assert!(query.contains("created_at <= $"));
3213
3214 assert!(!query.contains(&start_date.format("%Y-%m-%d").to_string()));
3216 assert!(!query.contains(&end_date.format("%Y-%m-%d").to_string()));
3217 }
3218
3219 #[test]
3220 fn test_similarity_threshold_parameterization() {
3221 let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
3222
3223 builder.bind_index = 2; builder.add_similarity_threshold(0.7);
3226
3227 let query = builder.build_query();
3228
3229 assert!(query.contains("embedding <=> $1"));
3231 assert!(query.contains(">= $2"));
3232
3233 assert!(!query.contains("0.7"));
3235 }
3236
3237 #[test]
3238 fn test_query_builder_prevents_format_injection() {
3239 let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
3240
3241 let malicious_tier = MemoryTier::Working; builder.add_tier_filter(&malicious_tier);
3244
3245 builder.add_importance_range(Some(0.0), Some(1.0));
3247
3248 builder.add_exclude_frozen(true);
3250
3251 let query = builder.build_query();
3252
3253 assert!(query.matches('$').count() >= 3); assert!(query.contains("tier != 'frozen'"));
3258 assert!(query.contains("status = 'active'"));
3259
3260 assert!(!query.contains("{}"));
3262 assert!(!query.contains("{:"));
3263 }
3264
3265 #[test]
3266 fn test_complex_query_building_safety() {
3267 let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
3268
3269 builder.add_tier_filter(&MemoryTier::Working);
3271 builder.add_importance_range(Some(0.3), Some(0.8));
3272
3273 let start_date = Utc::now() - chrono::Duration::days(7);
3274 let end_date = Utc::now();
3275 builder.add_date_range(Some(&start_date), Some(&end_date));
3276
3277 builder.add_consolidation_strength_range(Some(2.0), None);
3278 builder.add_recall_probability_range(None, Some(0.9));
3279 builder.add_exclude_frozen(true);
3280
3281 builder.add_condition("ORDER BY importance_score DESC");
3282 builder.add_pagination(50, 100).unwrap();
3283
3284 let query = builder.build_query();
3285
3286 assert!(query.starts_with("SELECT * FROM memories WHERE status = 'active'"));
3288 assert!(query.contains("ORDER BY importance_score DESC"));
3289 assert!(query.contains("LIMIT"));
3290 assert!(query.contains("OFFSET"));
3291
3292 let param_count = query.matches('$').count();
3294 assert!(param_count >= 7); assert!(!query.contains("'; "));
3298 assert!(!query.contains("OR 1=1"));
3299 assert!(!query.contains("UNION"));
3300 assert!(!query.contains("--"));
3301 assert!(!query.contains("/*"));
3302 }
3303
3304 #[tokio::test]
3305 async fn test_get_memories_for_forgetting_query_structure() {
3306 let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
3308
3309 let tier = MemoryTier::Working;
3311 builder.add_tier_filter(&tier);
3312
3313 builder.add_condition("AND (updated_at IS NULL OR updated_at < NOW() - INTERVAL '1 hour')");
3315
3316 builder.add_condition("ORDER BY updated_at ASC NULLS FIRST");
3318
3319 let query = builder.build_query();
3320
3321 assert!(query.contains("SELECT * FROM memories WHERE status = 'active'"));
3323 assert!(query.contains("tier = $"));
3324 assert!(query.contains("updated_at IS NULL"));
3325 assert!(query.contains("updated_at < NOW() - INTERVAL '1 hour'"));
3326 assert!(query.contains("ORDER BY updated_at ASC NULLS FIRST"));
3327
3328 assert!(!query.contains("'; DROP"));
3330 assert!(!query.contains("OR 1=1"));
3331 }
3332
3333 #[test]
3334 fn test_clean_architecture_layer_separation() {
3335 let mut builder = SafeQueryBuilder::new("SELECT * FROM memories WHERE status = 'active'");
3342
3343 builder.add_tier_filter(&MemoryTier::Working);
3345 builder.add_condition("AND (updated_at IS NULL OR updated_at < NOW() - INTERVAL '1 hour')");
3346
3347 let query = builder.build_query();
3348
3349 assert!(query.len() > 50); assert!(query.contains("$")); assert!(!query.contains("'working'")); assert!(!query.contains("'; ")); }
3357
3358 #[test]
3359 fn test_repository_method_signatures() {
3360 let _test_fn: fn(
3368 &MemoryRepository,
3369 MemoryTier,
3370 usize,
3371 ) -> std::pin::Pin<
3372 Box<dyn std::future::Future<Output = Result<Vec<Memory>>> + Send + '_>,
3373 > = |repo, tier, batch_size| Box::pin(repo.get_memories_for_forgetting(tier, batch_size));
3374
3375 assert!(true);
3377 }
3378}