1use super::error::{MemoryError, Result};
2use super::models::*;
3use chrono::Utc;
4use pgvector::Vector;
5use sqlx::postgres::types::PgInterval;
6use sqlx::{PgPool, Postgres, Row, Transaction};
7use std::collections::HashMap;
8use std::time::Instant;
9use tracing::{debug, info};
10use uuid::Uuid;
11
12pub struct MemoryRepository {
13 pool: PgPool,
14}
15
16impl MemoryRepository {
17 pub fn new(pool: PgPool) -> Self {
18 Self { pool }
19 }
20
21 pub fn pool(&self) -> &PgPool {
22 &self.pool
23 }
24
25 pub async fn create_memory(&self, request: CreateMemoryRequest) -> Result<Memory> {
26 let id = Uuid::new_v4();
27 let content_hash = Memory::calculate_content_hash(&request.content);
28 let tier = request.tier.unwrap_or(MemoryTier::Working);
29
30 let skip_duplicate_check =
32 std::env::var("SKIP_DUPLICATE_CHECK").unwrap_or_else(|_| "false".to_string()) == "true";
33
34 if !skip_duplicate_check {
35 let duplicate_exists = sqlx::query_scalar::<_, bool>(
36 "SELECT EXISTS(SELECT 1 FROM memories WHERE content_hash = $1 AND tier = $2 AND status = 'active')"
37 )
38 .bind(&content_hash)
39 .bind(tier)
40 .fetch_one(&self.pool)
41 .await?;
42
43 if duplicate_exists {
44 return Err(MemoryError::DuplicateContent {
45 tier: format!("{tier:?}"),
46 });
47 }
48 }
49
50 let embedding = request.embedding.map(Vector::from);
51
52 let memory = sqlx::query_as::<_, Memory>(
53 r#"
54 INSERT INTO memories (
55 id, content, content_hash, embedding, tier, status,
56 importance_score, metadata, parent_id, expires_at,
57 consolidation_strength, decay_rate
58 )
59 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
60 RETURNING *
61 "#,
62 )
63 .bind(id)
64 .bind(&request.content)
65 .bind(&content_hash)
66 .bind(embedding)
67 .bind(tier)
68 .bind(MemoryStatus::Active)
69 .bind(request.importance_score.unwrap_or(0.5))
70 .bind(request.metadata.unwrap_or(serde_json::json!({})))
71 .bind(request.parent_id)
72 .bind(request.expires_at)
73 .bind(1.0_f64) .bind(1.0_f64) .fetch_one(&self.pool)
76 .await?;
77
78 info!("Created memory {} in tier {:?}", memory.id, memory.tier);
79 Ok(memory)
80 }
81
82 pub async fn get_memory(&self, id: Uuid) -> Result<Memory> {
83 let memory = sqlx::query_as::<_, Memory>(
84 r#"
85 UPDATE memories
86 SET access_count = access_count + 1,
87 last_accessed_at = NOW()
88 WHERE id = $1 AND status = 'active'
89 RETURNING *
90 "#,
91 )
92 .bind(id)
93 .fetch_optional(&self.pool)
94 .await?
95 .ok_or_else(|| MemoryError::NotFound { id: id.to_string() })?;
96
97 debug!("Retrieved memory {} from tier {:?}", id, memory.tier);
98 Ok(memory)
99 }
100
101 pub async fn update_memory(&self, id: Uuid, request: UpdateMemoryRequest) -> Result<Memory> {
102 let mut tx = self.pool.begin().await?;
103
104 let current = sqlx::query_as::<_, Memory>(
106 "SELECT * FROM memories WHERE id = $1 AND status = 'active' FOR UPDATE",
107 )
108 .bind(id)
109 .fetch_optional(&mut *tx)
110 .await?
111 .ok_or_else(|| MemoryError::NotFound { id: id.to_string() })?;
112
113 let content = request.content.as_ref().unwrap_or(¤t.content);
115 let content_hash = if request.content.is_some() {
116 Memory::calculate_content_hash(content)
117 } else {
118 current.content_hash.clone()
119 };
120
121 let embedding = request.embedding.map(Vector::from).or(current.embedding);
122 let tier = request.tier.unwrap_or(current.tier);
123 let importance_score = request.importance_score.unwrap_or(current.importance_score);
124 let metadata = request.metadata.as_ref().unwrap_or(¤t.metadata);
125 let expires_at = request.expires_at.or(current.expires_at);
126
127 let updated = sqlx::query_as::<_, Memory>(
128 r#"
129 UPDATE memories
130 SET content = $2, content_hash = $3, embedding = $4, tier = $5,
131 importance_score = $6, metadata = $7, expires_at = $8,
132 updated_at = NOW()
133 WHERE id = $1
134 RETURNING *
135 "#,
136 )
137 .bind(id)
138 .bind(content)
139 .bind(&content_hash)
140 .bind(embedding)
141 .bind(tier)
142 .bind(importance_score)
143 .bind(metadata)
144 .bind(expires_at)
145 .fetch_one(&mut *tx)
146 .await?;
147
148 if current.tier != tier {
150 self.record_migration(
151 &mut tx,
152 id,
153 current.tier,
154 tier,
155 Some("Manual update".to_string()),
156 )
157 .await?;
158 }
159
160 tx.commit().await?;
161 info!("Updated memory {}", id);
162 Ok(updated)
163 }
164
165 pub async fn delete_memory(&self, id: Uuid) -> Result<()> {
166 let result = sqlx::query(
167 "UPDATE memories SET status = 'deleted' WHERE id = $1 AND status = 'active'",
168 )
169 .bind(id)
170 .execute(&self.pool)
171 .await?;
172
173 if result.rows_affected() == 0 {
174 return Err(MemoryError::NotFound { id: id.to_string() });
175 }
176
177 info!("Soft deleted memory {}", id);
178 Ok(())
179 }
180
181 pub async fn search_memories(&self, request: SearchRequest) -> Result<SearchResponse> {
182 let start_time = Instant::now();
183
184 let search_type = request
185 .search_type
186 .as_ref()
187 .unwrap_or(&SearchType::Semantic)
188 .clone();
189 let limit = request.limit.unwrap_or(10);
190 let offset = request.offset.unwrap_or(0);
191
192 let results = match search_type {
193 SearchType::Semantic => self.semantic_search(&request).await?,
194 SearchType::Temporal => self.temporal_search(&request).await?,
195 SearchType::Hybrid => self.hybrid_search(&request).await?,
196 SearchType::FullText => self.fulltext_search(&request).await?,
197 };
198
199 let total_count = if request.include_facets.unwrap_or(false) {
200 Some(self.count_search_results(&request).await?)
201 } else {
202 None
203 };
204
205 let facets = if request.include_facets.unwrap_or(false) {
206 Some(self.generate_search_facets(&request).await?)
207 } else {
208 None
209 };
210
211 let suggestions = if request.query_text.is_some() {
212 Some(self.generate_query_suggestions(&request).await?)
213 } else {
214 None
215 };
216
217 let next_cursor = if results.len() as i32 >= limit {
218 Some(self.generate_cursor(offset + limit as i64, &request))
219 } else {
220 None
221 };
222
223 let execution_time_ms = start_time.elapsed().as_millis() as u64;
224
225 Ok(SearchResponse {
226 results,
227 total_count,
228 facets,
229 suggestions,
230 next_cursor,
231 execution_time_ms,
232 })
233 }
234
235 async fn semantic_search(&self, request: &SearchRequest) -> Result<Vec<SearchResult>> {
236 let query_embedding = if let Some(ref embedding) = request.query_embedding {
237 Vector::from(embedding.clone())
238 } else {
239 return Err(MemoryError::InvalidRequest {
240 message: "Query embedding is required for semantic search".to_string(),
241 });
242 };
243
244 let limit = request.limit.unwrap_or(10);
245 let offset = request.offset.unwrap_or(0);
246 let threshold = request.similarity_threshold.unwrap_or(0.7);
247
248 let mut query_parts = vec![
249 "SELECT m.*, 1 - (m.embedding <=> $1) as similarity_score".to_string(),
250 "FROM memories m".to_string(),
251 "WHERE m.status = 'active' AND m.embedding IS NOT NULL".to_string(),
252 ];
253
254 self.add_filters(request, &mut query_parts)?;
256
257 query_parts.push(format!("AND 1 - (m.embedding <=> $1) >= {threshold}"));
258 query_parts.push("ORDER BY similarity_score DESC".to_string());
259 query_parts.push(format!("LIMIT {limit} OFFSET {offset}"));
260
261 let query = query_parts.join(" ");
262 let rows = sqlx::query(&query)
263 .bind(&query_embedding)
264 .fetch_all(&self.pool)
265 .await?;
266
267 self.build_search_results(rows, request).await
268 }
269
270 async fn temporal_search(&self, request: &SearchRequest) -> Result<Vec<SearchResult>> {
271 let limit = request.limit.unwrap_or(10);
272 let offset = request.offset.unwrap_or(0);
273
274 let mut query_parts = vec![
275 "SELECT m.*, 0.0 as similarity_score".to_string(),
276 "FROM memories m".to_string(),
277 "WHERE m.status = 'active'".to_string(),
278 ];
279
280 self.add_filters(request, &mut query_parts)?;
282
283 query_parts.push("ORDER BY m.created_at DESC, m.updated_at DESC".to_string());
284 query_parts.push(format!("LIMIT {limit} OFFSET {offset}"));
285
286 let query = query_parts.join(" ");
287 let rows = sqlx::query(&query).fetch_all(&self.pool).await?;
288
289 self.build_search_results(rows, request).await
290 }
291
292 async fn hybrid_search(&self, request: &SearchRequest) -> Result<Vec<SearchResult>> {
293 let weights = request.hybrid_weights.as_ref().unwrap_or(&HybridWeights {
295 semantic_weight: 0.333,
296 temporal_weight: 0.333, importance_weight: 0.334,
298 access_frequency_weight: 0.0, });
300
301 let query_embedding = if let Some(ref embedding) = request.query_embedding {
302 Vector::from(embedding.clone())
303 } else {
304 return Err(MemoryError::InvalidRequest {
305 message: "Query embedding is required for hybrid search".to_string(),
306 });
307 };
308
309 let limit = request.limit.unwrap_or(10);
310 let offset = request.offset.unwrap_or(0);
311 let threshold = request.similarity_threshold.unwrap_or(0.5);
312
313 sqlx::query(
315 r#"
316 UPDATE memories
317 SET recency_score = calculate_recency_score(last_accessed_at, created_at, 0.005),
318 relevance_score = LEAST(1.0,
319 0.5 * importance_score +
320 0.3 * LEAST(1.0, access_count / 100.0) +
321 0.2
322 )
323 WHERE status = 'active' AND embedding IS NOT NULL
324 "#,
325 )
326 .execute(&self.pool)
327 .await?;
328
329 let query = format!(
330 r#"
331 SELECT m.*,
332 1 - (m.embedding <=> $1) as similarity_score,
333 m.recency_score as temporal_score,
334 m.importance_score,
335 m.relevance_score,
336 COALESCE(m.access_count, 0) as access_count,
337 calculate_combined_score(
338 m.recency_score,
339 m.importance_score,
340 m.relevance_score,
341 {}, {}, {}
342 ) as combined_score
343 FROM memories m
344 WHERE m.status = 'active'
345 AND m.embedding IS NOT NULL
346 AND 1 - (m.embedding <=> $1) >= {}
347 ORDER BY combined_score DESC, similarity_score DESC
348 LIMIT {} OFFSET {}
349 "#,
350 weights.temporal_weight, weights.importance_weight, weights.semantic_weight, threshold,
354 limit,
355 offset
356 );
357
358 let rows = sqlx::query(&query)
359 .bind(&query_embedding)
360 .fetch_all(&self.pool)
361 .await?;
362
363 self.build_search_results(rows, request).await
364 }
365
366 async fn fulltext_search(&self, request: &SearchRequest) -> Result<Vec<SearchResult>> {
367 let query_text =
368 request
369 .query_text
370 .as_ref()
371 .ok_or_else(|| MemoryError::InvalidRequest {
372 message: "Query text is required for full-text search".to_string(),
373 })?;
374
375 let limit = request.limit.unwrap_or(10);
376 let offset = request.offset.unwrap_or(0);
377
378 let query = format!(
379 r#"
380 SELECT m.*,
381 ts_rank_cd(to_tsvector('english', m.content), plainto_tsquery('english', $1)) as similarity_score
382 FROM memories m
383 WHERE m.status = 'active'
384 AND to_tsvector('english', m.content) @@ plainto_tsquery('english', $1)
385 ORDER BY similarity_score DESC
386 LIMIT {limit} OFFSET {offset}
387 "#
388 );
389
390 let rows = sqlx::query(&query)
391 .bind(query_text)
392 .fetch_all(&self.pool)
393 .await?;
394
395 self.build_search_results(rows, request).await
396 }
397
398 fn add_filters(&self, request: &SearchRequest, query_parts: &mut Vec<String>) -> Result<()> {
399 if let Some(tier) = &request.tier {
400 query_parts.push(format!("AND m.tier = '{tier:?}'"));
401 }
402
403 if let Some(date_range) = &request.date_range {
404 if let Some(start) = &date_range.start {
405 query_parts.push(format!(
406 "AND m.created_at >= '{}'",
407 start.format("%Y-%m-%d %H:%M:%S")
408 ));
409 }
410 if let Some(end) = &date_range.end {
411 query_parts.push(format!(
412 "AND m.created_at <= '{}'",
413 end.format("%Y-%m-%d %H:%M:%S")
414 ));
415 }
416 }
417
418 if let Some(importance_range) = &request.importance_range {
419 if let Some(min) = importance_range.min {
420 query_parts.push(format!("AND m.importance_score >= {min}"));
421 }
422 if let Some(max) = importance_range.max {
423 query_parts.push(format!("AND m.importance_score <= {max}"));
424 }
425 }
426
427 Ok(())
428 }
429
430 async fn build_search_results(
431 &self,
432 rows: Vec<sqlx::postgres::PgRow>,
433 request: &SearchRequest,
434 ) -> Result<Vec<SearchResult>> {
435 let mut results = Vec::new();
436 let explain_score = request.explain_score.unwrap_or(false);
437
438 for row in rows {
439 let memory = Memory {
440 id: row.try_get("id")?,
441 content: row.try_get("content")?,
442 content_hash: row.try_get("content_hash")?,
443 embedding: row.try_get("embedding")?,
444 tier: row.try_get("tier")?,
445 status: row.try_get("status")?,
446 importance_score: row.try_get("importance_score")?,
447 access_count: row.try_get("access_count")?,
448 last_accessed_at: row.try_get("last_accessed_at")?,
449 metadata: row.try_get("metadata")?,
450 parent_id: row.try_get("parent_id")?,
451 created_at: row.try_get("created_at")?,
452 updated_at: row.try_get("updated_at")?,
453 expires_at: row.try_get("expires_at")?,
454 consolidation_strength: row.try_get("consolidation_strength").unwrap_or(1.0),
455 decay_rate: row.try_get("decay_rate").unwrap_or(1.0),
456 recall_probability: row.try_get("recall_probability")?,
457 last_recall_interval: row.try_get("last_recall_interval")?,
458 recency_score: row.try_get("recency_score").unwrap_or(0.0),
459 relevance_score: row.try_get("relevance_score").unwrap_or(0.0),
460 };
461
462 let similarity_score: f32 = row.try_get("similarity_score").unwrap_or(0.0);
463 let combined_score: f32 = row.try_get("combined_score").unwrap_or(similarity_score);
464 let temporal_score: Option<f32> = row.try_get("temporal_score").ok();
465 let access_frequency_score: Option<f32> = row.try_get("access_frequency_score").ok();
466 let importance_score = memory.importance_score; let score_explanation = if explain_score {
469 Some(ScoreExplanation {
470 semantic_contribution: similarity_score * 0.4,
471 temporal_contribution: temporal_score.unwrap_or(0.0) * 0.3,
472 importance_contribution: (importance_score * 0.2) as f32,
473 access_frequency_contribution: access_frequency_score.unwrap_or(0.0) * 0.1,
474 total_score: combined_score,
475 factors: vec![
476 "semantic similarity".to_string(),
477 "recency".to_string(),
478 "importance".to_string(),
479 ],
480 })
481 } else {
482 None
483 };
484
485 results.push(SearchResult {
486 memory,
487 similarity_score,
488 temporal_score,
489 importance_score,
490 access_frequency_score,
491 combined_score,
492 score_explanation,
493 });
494 }
495
496 debug!("Built {} search results", results.len());
497 Ok(results)
498 }
499
500 async fn count_search_results(&self, _request: &SearchRequest) -> Result<i64> {
501 let count: i64 =
503 sqlx::query_scalar("SELECT COUNT(*) FROM memories WHERE status = 'active'")
504 .fetch_one(&self.pool)
505 .await?;
506 Ok(count)
507 }
508
509 async fn generate_search_facets(&self, _request: &SearchRequest) -> Result<SearchFacets> {
510 let tier_rows: Vec<(String, i64)> = sqlx::query_as(
512 "SELECT tier, COUNT(*) FROM memories WHERE status = 'active' GROUP BY tier",
513 )
514 .fetch_all(&self.pool)
515 .await?;
516
517 let mut tiers = HashMap::new();
518 for (tier_str, count) in tier_rows {
519 if let Ok(tier) = tier_str.parse::<MemoryTier>() {
520 tiers.insert(tier, count);
521 }
522 }
523
524 let date_histogram = vec![DateBucket {
526 date: Utc::now(),
527 count: 10,
528 interval: "day".to_string(),
529 }];
530
531 let importance_ranges = vec![
533 ImportanceRange {
534 min: 0.0,
535 max: 0.3,
536 count: 5,
537 label: "Low".to_string(),
538 },
539 ImportanceRange {
540 min: 0.3,
541 max: 0.7,
542 count: 15,
543 label: "Medium".to_string(),
544 },
545 ImportanceRange {
546 min: 0.7,
547 max: 1.0,
548 count: 8,
549 label: "High".to_string(),
550 },
551 ];
552
553 Ok(SearchFacets {
554 tiers,
555 date_histogram,
556 importance_ranges,
557 tags: HashMap::new(), })
559 }
560
561 async fn generate_query_suggestions(&self, _request: &SearchRequest) -> Result<Vec<String>> {
562 Ok(vec![
564 "recent code changes".to_string(),
565 "function definitions".to_string(),
566 "error handling patterns".to_string(),
567 ])
568 }
569
570 fn generate_cursor(&self, offset: i64, _request: &SearchRequest) -> String {
571 use base64::{engine::general_purpose::STANDARD, Engine};
573 STANDARD.encode(format!("offset:{offset}"))
574 }
575
576 pub async fn search_memories_simple(
578 &self,
579 request: SearchRequest,
580 ) -> Result<Vec<SearchResult>> {
581 let response = self.search_memories(request).await?;
582 Ok(response.results)
583 }
584
585 pub async fn get_memories_by_tier(
586 &self,
587 tier: MemoryTier,
588 limit: Option<i64>,
589 ) -> Result<Vec<Memory>> {
590 let limit = limit.unwrap_or(100);
591
592 let memories = sqlx::query_as::<_, Memory>(
593 r#"
594 SELECT * FROM memories
595 WHERE tier = $1 AND status = 'active'
596 ORDER BY importance_score DESC, updated_at DESC
597 LIMIT $2
598 "#,
599 )
600 .bind(tier)
601 .bind(limit)
602 .fetch_all(&self.pool)
603 .await?;
604
605 Ok(memories)
606 }
607
608 pub async fn migrate_memory(
609 &self,
610 id: Uuid,
611 to_tier: MemoryTier,
612 reason: Option<String>,
613 ) -> Result<Memory> {
614 let mut tx = self.pool.begin().await?;
615
616 let current = sqlx::query_as::<_, Memory>(
618 "SELECT * FROM memories WHERE id = $1 AND status = 'active' FOR UPDATE",
619 )
620 .bind(id)
621 .fetch_optional(&mut *tx)
622 .await?
623 .ok_or_else(|| MemoryError::NotFound { id: id.to_string() })?;
624
625 if current.tier == to_tier {
626 return Ok(current);
627 }
628
629 let valid_transition = match (current.tier, to_tier) {
631 (MemoryTier::Working, MemoryTier::Warm)
632 | (MemoryTier::Working, MemoryTier::Cold)
633 | (MemoryTier::Warm, MemoryTier::Cold)
634 | (MemoryTier::Warm, MemoryTier::Working)
635 | (MemoryTier::Cold, MemoryTier::Warm) => true,
636 _ => false,
637 };
638
639 if !valid_transition {
640 return Err(MemoryError::InvalidTierTransition {
641 from: format!("{:?}", current.tier),
642 to: format!("{to_tier:?}"),
643 });
644 }
645
646 let start = std::time::Instant::now();
647
648 let updated = sqlx::query_as::<_, Memory>(
650 r#"
651 UPDATE memories
652 SET tier = $2, status = 'active', updated_at = NOW()
653 WHERE id = $1
654 RETURNING *
655 "#,
656 )
657 .bind(id)
658 .bind(to_tier)
659 .fetch_one(&mut *tx)
660 .await?;
661
662 let duration_ms = start.elapsed().as_millis() as i32;
663
664 self.record_migration(&mut tx, id, current.tier, to_tier, reason)
666 .await?;
667
668 tx.commit().await?;
669
670 info!(
671 "Migrated memory {} from {:?} to {:?} in {}ms",
672 id, current.tier, to_tier, duration_ms
673 );
674
675 Ok(updated)
676 }
677
678 async fn record_migration(
679 &self,
680 tx: &mut Transaction<'_, Postgres>,
681 memory_id: Uuid,
682 from_tier: MemoryTier,
683 to_tier: MemoryTier,
684 reason: Option<String>,
685 ) -> Result<()> {
686 sqlx::query(
687 r#"
688 INSERT INTO migration_history (memory_id, from_tier, to_tier, migration_reason, success)
689 VALUES ($1, $2, $3, $4, true)
690 "#,
691 )
692 .bind(memory_id)
693 .bind(from_tier)
694 .bind(to_tier)
695 .bind(reason)
696 .execute(&mut **tx)
697 .await?;
698
699 Ok(())
700 }
701
702 pub async fn get_expired_memories(&self) -> Result<Vec<Memory>> {
703 let memories = sqlx::query_as::<_, Memory>(
704 r#"
705 SELECT * FROM memories
706 WHERE status = 'active'
707 AND expires_at IS NOT NULL
708 AND expires_at < NOW()
709 "#,
710 )
711 .fetch_all(&self.pool)
712 .await?;
713
714 Ok(memories)
715 }
716
717 pub async fn cleanup_expired_memories(&self) -> Result<u64> {
718 let result = sqlx::query(
719 r#"
720 UPDATE memories
721 SET status = 'deleted'
722 WHERE status = 'active'
723 AND expires_at IS NOT NULL
724 AND expires_at < NOW()
725 "#,
726 )
727 .execute(&self.pool)
728 .await?;
729
730 let count = result.rows_affected();
731 if count > 0 {
732 info!("Cleaned up {} expired memories", count);
733 }
734
735 Ok(count)
736 }
737
738 pub async fn get_migration_candidates(
739 &self,
740 tier: MemoryTier,
741 limit: i64,
742 ) -> Result<Vec<Memory>> {
743 let query = match tier {
744 MemoryTier::Working => {
745 r#"
746 SELECT * FROM memories
747 WHERE tier = 'working'
748 AND status = 'active'
749 AND (
750 importance_score < 0.3
751 OR (last_accessed_at IS NOT NULL
752 AND last_accessed_at < NOW() - INTERVAL '24 hours')
753 )
754 ORDER BY importance_score ASC, last_accessed_at ASC NULLS FIRST
755 LIMIT $1
756 "#
757 }
758 MemoryTier::Warm => {
759 r#"
760 SELECT * FROM memories
761 WHERE tier = 'warm'
762 AND status = 'active'
763 AND importance_score < 0.1
764 AND updated_at < NOW() - INTERVAL '7 days'
765 ORDER BY importance_score ASC, updated_at ASC
766 LIMIT $1
767 "#
768 }
769 MemoryTier::Cold => {
770 return Ok(Vec::new());
771 }
772 MemoryTier::Frozen => {
773 return Ok(Vec::new()); }
775 };
776
777 let memories = sqlx::query_as::<_, Memory>(query)
778 .bind(limit)
779 .fetch_all(&self.pool)
780 .await?;
781
782 Ok(memories)
783 }
784
785 pub async fn get_statistics(&self) -> Result<MemoryStatistics> {
786 let stats = sqlx::query_as::<_, MemoryStatistics>(
787 r#"
788 SELECT
789 COUNT(*) FILTER (WHERE tier = 'working' AND status = 'active') as working_count,
790 COUNT(*) FILTER (WHERE tier = 'warm' AND status = 'active') as warm_count,
791 COUNT(*) FILTER (WHERE tier = 'cold' AND status = 'active') as cold_count,
792 COUNT(*) FILTER (WHERE status = 'active') as total_active,
793 COUNT(*) FILTER (WHERE status = 'deleted') as total_deleted,
794 AVG(importance_score) FILTER (WHERE status = 'active') as avg_importance,
795 MAX(access_count) FILTER (WHERE status = 'active') as max_access_count,
796 CAST(AVG(access_count) FILTER (WHERE status = 'active') AS FLOAT8) as avg_access_count
797 FROM memories
798 "#,
799 )
800 .fetch_one(&self.pool)
801 .await?;
802
803 Ok(stats)
804 }
805
806 pub async fn get_consolidation_analytics(&self) -> Result<Vec<ConsolidationAnalytics>> {
810 let analytics = sqlx::query_as::<_, ConsolidationAnalytics>(
811 r#"
812 SELECT
813 tier,
814 COUNT(*) as total_memories,
815 AVG(consolidation_strength) as avg_consolidation_strength,
816 AVG(recall_probability) as avg_recall_probability,
817 AVG(decay_rate) as avg_decay_rate,
818 AVG(EXTRACT(EPOCH FROM (NOW() - created_at)) / 86400) as avg_age_days,
819 COUNT(*) FILTER (WHERE recall_probability < 0.3) as migration_candidates,
820 COUNT(*) FILTER (WHERE last_accessed_at IS NULL) as never_accessed,
821 COUNT(*) FILTER (WHERE last_accessed_at > NOW() - INTERVAL '24 hours') as accessed_recently
822 FROM memories
823 WHERE status = 'active'
824 GROUP BY tier
825 ORDER BY
826 CASE tier
827 WHEN 'working' THEN 1
828 WHEN 'warm' THEN 2
829 WHEN 'cold' THEN 3
830 WHEN 'frozen' THEN 4
831 END
832 "#,
833 )
834 .fetch_all(&self.pool)
835 .await?;
836
837 Ok(analytics)
838 }
839
840 pub async fn get_consolidation_events(&self) -> Result<Vec<ConsolidationEventSummary>> {
842 let events = sqlx::query_as::<_, ConsolidationEventSummary>(
843 r#"
844 SELECT
845 event_type,
846 COUNT(*) as event_count,
847 AVG(new_consolidation_strength - previous_consolidation_strength) as avg_strength_change,
848 AVG(COALESCE(new_recall_probability, 0) - COALESCE(previous_recall_probability, 0)) as avg_probability_change,
849 AVG(EXTRACT(EPOCH FROM recall_interval) / 3600) as avg_recall_interval_hours
850 FROM memory_consolidation_log
851 WHERE created_at > NOW() - INTERVAL '7 days'
852 GROUP BY event_type
853 ORDER BY event_count DESC
854 "#,
855 )
856 .fetch_all(&self.pool)
857 .await?;
858
859 Ok(events)
860 }
861
862 pub async fn find_migration_candidates(
864 &self,
865 tier: MemoryTier,
866 limit: i32,
867 ) -> Result<Vec<Memory>> {
868 let threshold = match tier {
869 MemoryTier::Working => 0.7,
870 MemoryTier::Warm => 0.5,
871 MemoryTier::Cold => 0.2,
872 MemoryTier::Frozen => 0.0, };
874
875 let memories = sqlx::query_as::<_, Memory>(
876 r#"
877 SELECT * FROM memories
878 WHERE tier = $1
879 AND status = 'active'
880 AND (recall_probability < $2 OR recall_probability IS NULL)
881 ORDER BY recall_probability ASC NULLS LAST, consolidation_strength ASC
882 LIMIT $3
883 "#,
884 )
885 .bind(tier)
886 .bind(threshold)
887 .bind(limit)
888 .fetch_all(&self.pool)
889 .await?;
890
891 Ok(memories)
892 }
893
894 pub async fn update_consolidation(
896 &self,
897 memory_id: Uuid,
898 consolidation_strength: f64,
899 decay_rate: f64,
900 recall_probability: Option<f64>,
901 ) -> Result<()> {
902 sqlx::query(
903 r#"
904 UPDATE memories
905 SET consolidation_strength = $2,
906 decay_rate = $3,
907 recall_probability = $4,
908 updated_at = NOW()
909 WHERE id = $1 AND status = 'active'
910 "#,
911 )
912 .bind(memory_id)
913 .bind(consolidation_strength)
914 .bind(decay_rate)
915 .bind(recall_probability)
916 .execute(&self.pool)
917 .await?;
918
919 Ok(())
920 }
921
922 pub async fn log_consolidation_event(
924 &self,
925 memory_id: Uuid,
926 event_type: &str,
927 previous_strength: f64,
928 new_strength: f64,
929 previous_probability: Option<f64>,
930 new_probability: Option<f64>,
931 recall_interval: Option<PgInterval>,
932 context: serde_json::Value,
933 ) -> Result<()> {
934 sqlx::query(
935 r#"
936 INSERT INTO memory_consolidation_log (
937 memory_id, event_type, previous_consolidation_strength,
938 new_consolidation_strength, previous_recall_probability,
939 new_recall_probability, recall_interval, access_context
940 )
941 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
942 "#,
943 )
944 .bind(memory_id)
945 .bind(event_type)
946 .bind(previous_strength)
947 .bind(new_strength)
948 .bind(previous_probability)
949 .bind(new_probability)
950 .bind(recall_interval)
951 .bind(context)
952 .execute(&self.pool)
953 .await?;
954
955 Ok(())
956 }
957
958 pub async fn freeze_memory(
960 &self,
961 memory_id: Uuid,
962 _reason: Option<String>,
963 ) -> Result<FreezeMemoryResponse> {
964 let mut tx = self.pool.begin().await?;
965
966 let frozen_row = sqlx::query("SELECT freeze_memory($1) as frozen_id")
968 .bind(memory_id)
969 .fetch_one(&mut *tx)
970 .await?;
971
972 let frozen_id: Uuid = frozen_row.get("frozen_id");
973
974 let frozen_memory =
976 sqlx::query_as::<_, FrozenMemory>("SELECT * FROM frozen_memories WHERE id = $1")
977 .bind(frozen_id)
978 .fetch_one(&mut *tx)
979 .await?;
980
981 tx.commit().await?;
982
983 Ok(FreezeMemoryResponse {
984 frozen_id,
985 compression_ratio: frozen_memory.compression_ratio,
986 original_tier: MemoryTier::Cold, frozen_at: frozen_memory.frozen_at,
988 })
989 }
990
991 pub async fn unfreeze_memory(
993 &self,
994 frozen_id: Uuid,
995 target_tier: Option<MemoryTier>,
996 ) -> Result<UnfreezeMemoryResponse> {
997 let mut tx = self.pool.begin().await?;
998
999 let _frozen_memory =
1001 sqlx::query_as::<_, FrozenMemory>("SELECT * FROM frozen_memories WHERE id = $1")
1002 .bind(frozen_id)
1003 .fetch_one(&mut *tx)
1004 .await?;
1005
1006 let memory_row = sqlx::query("SELECT unfreeze_memory($1) as memory_id")
1008 .bind(frozen_id)
1009 .fetch_one(&mut *tx)
1010 .await?;
1011
1012 let memory_id: Uuid = memory_row.get("memory_id");
1013
1014 let restoration_tier = if let Some(tier) = target_tier {
1016 sqlx::query("UPDATE memories SET tier = $1 WHERE id = $2")
1017 .bind(tier)
1018 .bind(memory_id)
1019 .execute(&mut *tx)
1020 .await?;
1021 tier
1022 } else {
1023 MemoryTier::Working };
1025
1026 tx.commit().await?;
1027
1028 Ok(UnfreezeMemoryResponse {
1029 memory_id,
1030 retrieval_delay_seconds: 0, restoration_tier,
1032 unfrozen_at: Utc::now(),
1033 })
1034 }
1035
1036 pub async fn get_frozen_memories(&self, limit: i32, offset: i64) -> Result<Vec<FrozenMemory>> {
1038 let frozen_memories = sqlx::query_as::<_, FrozenMemory>(
1039 r#"
1040 SELECT * FROM frozen_memories
1041 ORDER BY frozen_at DESC
1042 LIMIT $1 OFFSET $2
1043 "#,
1044 )
1045 .bind(limit)
1046 .bind(offset)
1047 .fetch_all(&self.pool)
1048 .await?;
1049
1050 Ok(frozen_memories)
1051 }
1052
1053 pub async fn search_frozen_memories(
1055 &self,
1056 query: &str,
1057 limit: i32,
1058 ) -> Result<Vec<FrozenMemory>> {
1059 let frozen_memories = sqlx::query_as::<_, FrozenMemory>(
1060 r#"
1061 SELECT * FROM frozen_memories
1062 WHERE
1063 convert_from(compressed_content, 'UTF8') ILIKE $1
1064 OR freeze_reason ILIKE $1
1065 ORDER BY frozen_at DESC
1066 LIMIT $2
1067 "#,
1068 )
1069 .bind(format!("%{query}%"))
1070 .bind(limit)
1071 .fetch_all(&self.pool)
1072 .await?;
1073
1074 Ok(frozen_memories)
1075 }
1076
1077 pub async fn get_tier_statistics(&self) -> Result<Vec<MemoryTierStatistics>> {
1079 let stats = sqlx::query_as::<_, MemoryTierStatistics>(
1080 r#"
1081 SELECT * FROM memory_tier_statistics
1082 WHERE snapshot_timestamp > NOW() - INTERVAL '24 hours'
1083 ORDER BY snapshot_timestamp DESC, tier
1084 "#,
1085 )
1086 .fetch_all(&self.pool)
1087 .await?;
1088
1089 Ok(stats)
1090 }
1091
1092 pub async fn update_tier_statistics(&self) -> Result<()> {
1094 sqlx::query("SELECT update_tier_statistics()")
1095 .execute(&self.pool)
1096 .await?;
1097
1098 Ok(())
1099 }
1100
1101 pub async fn search_by_consolidation(
1103 &self,
1104 request: ConsolidationSearchRequest,
1105 ) -> Result<Vec<Memory>> {
1106 let mut conditions = Vec::new();
1107 let mut bind_index = 1;
1108
1109 if request.min_consolidation_strength.is_some() {
1111 conditions.push(format!("consolidation_strength >= ${bind_index}"));
1112 bind_index += 1;
1113 }
1114 if request.max_consolidation_strength.is_some() {
1115 conditions.push(format!("consolidation_strength <= ${bind_index}"));
1116 bind_index += 1;
1117 }
1118 if request.min_recall_probability.is_some() {
1119 conditions.push(format!("recall_probability >= ${bind_index}"));
1120 bind_index += 1;
1121 }
1122 if request.max_recall_probability.is_some() {
1123 conditions.push(format!("recall_probability <= ${bind_index}"));
1124 bind_index += 1;
1125 }
1126 if request.tier.is_some() {
1127 conditions.push(format!("tier = ${bind_index}"));
1128 bind_index += 1;
1129 }
1130
1131 if !request.include_frozen.unwrap_or(false) {
1132 conditions.push("tier != 'frozen'".to_string());
1133 }
1134
1135 conditions.push("status = 'active'".to_string());
1136
1137 let where_clause = if conditions.is_empty() {
1138 "WHERE status = 'active'".to_string()
1139 } else {
1140 format!("WHERE {}", conditions.join(" AND "))
1141 };
1142
1143 let query = format!(
1144 r#"
1145 SELECT * FROM memories
1146 {}
1147 ORDER BY consolidation_strength DESC, recall_probability DESC NULLS LAST
1148 LIMIT ${} OFFSET ${}
1149 "#,
1150 where_clause,
1151 bind_index,
1152 bind_index + 1
1153 );
1154
1155 let mut query_builder = sqlx::query_as::<_, Memory>(&query);
1156
1157 if let Some(val) = request.min_consolidation_strength {
1159 query_builder = query_builder.bind(val);
1160 }
1161 if let Some(val) = request.max_consolidation_strength {
1162 query_builder = query_builder.bind(val);
1163 }
1164 if let Some(val) = request.min_recall_probability {
1165 query_builder = query_builder.bind(val);
1166 }
1167 if let Some(val) = request.max_recall_probability {
1168 query_builder = query_builder.bind(val);
1169 }
1170 if let Some(val) = request.tier {
1171 query_builder = query_builder.bind(val);
1172 }
1173
1174 let limit = request.limit.unwrap_or(10);
1175 let offset = request.offset.unwrap_or(0);
1176 query_builder = query_builder.bind(limit).bind(offset);
1177
1178 let memories = query_builder.fetch_all(&self.pool).await?;
1179 Ok(memories)
1180 }
1181
1182 pub async fn update_memory_scores(
1184 &self,
1185 memory_id: Uuid,
1186 recency_score: f64,
1187 relevance_score: f64,
1188 ) -> Result<()> {
1189 sqlx::query(
1190 r#"
1191 UPDATE memories
1192 SET recency_score = $2,
1193 relevance_score = $3,
1194 updated_at = NOW()
1195 WHERE id = $1 AND status = 'active'
1196 "#,
1197 )
1198 .bind(memory_id)
1199 .bind(recency_score)
1200 .bind(relevance_score)
1201 .execute(&self.pool)
1202 .await?;
1203
1204 Ok(())
1205 }
1206
1207 pub async fn batch_update_three_component_scores(&self) -> Result<i64> {
1209 let start_time = Instant::now();
1210
1211 let result = sqlx::query(
1212 r#"
1213 UPDATE memories
1214 SET recency_score = calculate_recency_score(last_accessed_at, created_at, 0.005),
1215 relevance_score = LEAST(1.0,
1216 0.5 * importance_score +
1217 0.3 * LEAST(1.0, access_count / 100.0) +
1218 0.2
1219 ),
1220 updated_at = NOW()
1221 WHERE status = 'active'
1222 "#,
1223 )
1224 .execute(&self.pool)
1225 .await?;
1226
1227 let duration = start_time.elapsed();
1228 info!(
1229 "Updated three-component scores for {} memories in {:?}",
1230 result.rows_affected(),
1231 duration
1232 );
1233
1234 Ok(result.rows_affected() as i64)
1235 }
1236
1237 pub async fn get_memories_by_combined_score(
1239 &self,
1240 tier: Option<MemoryTier>,
1241 limit: Option<i32>,
1242 recency_weight: Option<f64>,
1243 importance_weight: Option<f64>,
1244 relevance_weight: Option<f64>,
1245 ) -> Result<Vec<Memory>> {
1246 let limit = limit.unwrap_or(50);
1247 let recency_w = recency_weight.unwrap_or(0.333);
1248 let importance_w = importance_weight.unwrap_or(0.333);
1249 let relevance_w = relevance_weight.unwrap_or(0.334);
1250
1251 let query = if let Some(tier) = tier {
1252 sqlx::query_as::<_, Memory>(
1253 r#"
1254 SELECT m.*
1255 FROM memories m
1256 WHERE m.status = 'active'
1257 AND m.tier = $1
1258 ORDER BY calculate_combined_score(m.recency_score, m.importance_score, m.relevance_score, $2, $3, $4) DESC, m.updated_at DESC
1259 LIMIT $5
1260 "#
1261 )
1262 .bind(format!("{:?}", tier).to_lowercase())
1263 .bind(recency_w)
1264 .bind(importance_w)
1265 .bind(relevance_w)
1266 .bind(limit as i64)
1267 } else {
1268 sqlx::query_as::<_, Memory>(
1269 r#"
1270 SELECT m.*
1271 FROM memories m
1272 WHERE m.status = 'active'
1273 ORDER BY calculate_combined_score(m.recency_score, m.importance_score, m.relevance_score, $1, $2, $3) DESC, m.updated_at DESC
1274 LIMIT $4
1275 "#
1276 )
1277 .bind(recency_w)
1278 .bind(importance_w)
1279 .bind(relevance_w)
1280 .bind(limit as i64)
1281 };
1282
1283 let memories = query.fetch_all(&self.pool).await?;
1284
1285 debug!(
1286 "Retrieved {} memories ranked by three-component score for tier {:?}",
1287 memories.len(),
1288 tier
1289 );
1290
1291 Ok(memories)
1292 }
1293
1294 pub async fn get_memories_for_consolidation(
1298 &self,
1299 tier: Option<MemoryTier>,
1300 batch_size: usize,
1301 min_hours_since_last_processing: f64,
1302 ) -> Result<Vec<Memory>> {
1303 let tier_filter = if let Some(tier) = tier {
1304 format!("AND tier = '{:?}'", tier).to_lowercase()
1305 } else {
1306 String::new()
1307 };
1308
1309 let query = format!(
1310 r#"
1311 SELECT * FROM memories
1312 WHERE status = 'active'
1313 AND (last_accessed_at IS NULL OR last_accessed_at < NOW() - INTERVAL '{} hours')
1314 {}
1315 ORDER BY
1316 CASE
1317 WHEN recall_probability IS NULL THEN 1
1318 WHEN recall_probability < 0.86 THEN 2
1319 ELSE 3
1320 END,
1321 last_accessed_at ASC NULLS FIRST,
1322 consolidation_strength ASC
1323 LIMIT $1
1324 "#,
1325 min_hours_since_last_processing, tier_filter
1326 );
1327
1328 let memories = sqlx::query_as::<_, Memory>(&query)
1329 .bind(batch_size as i64)
1330 .fetch_all(&self.pool)
1331 .await?;
1332
1333 Ok(memories)
1334 }
1335
1336 pub async fn batch_update_consolidation(
1338 &self,
1339 updates: &[(Uuid, f64, f64)], ) -> Result<usize> {
1341 if updates.is_empty() {
1342 return Ok(0);
1343 }
1344
1345 let mut tx = self.pool.begin().await?;
1346 let mut updated_count = 0;
1347
1348 for (memory_id, new_strength, recall_prob) in updates {
1349 let result = sqlx::query(
1350 r#"
1351 UPDATE memories
1352 SET consolidation_strength = $1,
1353 recall_probability = $2,
1354 updated_at = NOW()
1355 WHERE id = $3 AND status = 'active'
1356 "#,
1357 )
1358 .bind(new_strength)
1359 .bind(recall_prob)
1360 .bind(memory_id)
1361 .execute(&mut *tx)
1362 .await?;
1363
1364 updated_count += result.rows_affected() as usize;
1365 }
1366
1367 tx.commit().await?;
1368 Ok(updated_count)
1369 }
1370
1371 pub async fn batch_migrate_memories(
1373 &self,
1374 migrations: &[(Uuid, MemoryTier)], ) -> Result<usize> {
1376 if migrations.is_empty() {
1377 return Ok(0);
1378 }
1379
1380 let mut tx = self.pool.begin().await?;
1381 let mut migrated_count = 0;
1382
1383 for (memory_id, target_tier) in migrations {
1384 let current_memory: Option<(MemoryTier,)> =
1386 sqlx::query_as("SELECT tier FROM memories WHERE id = $1 AND status = 'active'")
1387 .bind(memory_id)
1388 .fetch_optional(&mut *tx)
1389 .await?;
1390
1391 if let Some((current_tier,)) = current_memory {
1392 let result = sqlx::query(
1394 r#"
1395 UPDATE memories
1396 SET tier = $1, updated_at = NOW()
1397 WHERE id = $2 AND status = 'active'
1398 "#,
1399 )
1400 .bind(target_tier)
1401 .bind(memory_id)
1402 .execute(&mut *tx)
1403 .await?;
1404
1405 if result.rows_affected() > 0 {
1406 migrated_count += 1;
1407
1408 self.record_migration(
1410 &mut tx,
1411 *memory_id,
1412 current_tier,
1413 *target_tier,
1414 Some("Simple consolidation automatic migration".to_string()),
1415 )
1416 .await?;
1417 }
1418 }
1419 }
1420
1421 tx.commit().await?;
1422 Ok(migrated_count)
1423 }
1424
1425 pub async fn get_simple_consolidation_candidates(
1427 &self,
1428 tier: Option<MemoryTier>,
1429 threshold: f64,
1430 limit: usize,
1431 ) -> Result<Vec<Memory>> {
1432 let tier_filter = if let Some(tier) = tier {
1433 format!("AND tier = '{:?}'", tier).to_lowercase()
1434 } else {
1435 String::new()
1436 };
1437
1438 let query = format!(
1439 r#"
1440 SELECT * FROM memories
1441 WHERE status = 'active'
1442 AND (recall_probability < $1 OR recall_probability IS NULL)
1443 {}
1444 ORDER BY
1445 COALESCE(recall_probability, 0) ASC,
1446 consolidation_strength ASC,
1447 last_accessed_at ASC NULLS FIRST
1448 LIMIT $2
1449 "#,
1450 tier_filter
1451 );
1452
1453 let memories = sqlx::query_as::<_, Memory>(&query)
1454 .bind(threshold)
1455 .bind(limit as i64)
1456 .fetch_all(&self.pool)
1457 .await?;
1458
1459 Ok(memories)
1460 }
1461
1462 pub async fn log_simple_consolidation_event(
1464 &self,
1465 memory_id: Uuid,
1466 previous_strength: f64,
1467 new_strength: f64,
1468 previous_probability: Option<f64>,
1469 new_probability: f64,
1470 processing_time_ms: u64,
1471 ) -> Result<()> {
1472 let context = serde_json::json!({
1473 "engine": "simple_consolidation",
1474 "processing_time_ms": processing_time_ms,
1475 "strength_delta": new_strength - previous_strength,
1476 "probability_delta": new_probability - previous_probability.unwrap_or(0.0)
1477 });
1478
1479 self.log_consolidation_event(
1480 memory_id,
1481 "simple_consolidation",
1482 previous_strength,
1483 new_strength,
1484 previous_probability,
1485 Some(new_probability),
1486 None, context,
1488 )
1489 .await
1490 }
1491
1492 pub async fn get_simple_consolidation_stats(&self) -> Result<SimpleConsolidationStats> {
1494 let stats = sqlx::query_as::<_, SimpleConsolidationStats>(
1495 r#"
1496 SELECT
1497 COUNT(*) FILTER (WHERE recall_probability < 0.86) as migration_candidates,
1498 COUNT(*) FILTER (WHERE consolidation_strength > 5.0) as highly_consolidated,
1499 AVG(consolidation_strength) as avg_consolidation_strength,
1500 AVG(recall_probability) as avg_recall_probability,
1501 COUNT(*) FILTER (WHERE last_accessed_at > NOW() - INTERVAL '24 hours') as recently_accessed,
1502 COUNT(*) as total_active_memories
1503 FROM memories
1504 WHERE status = 'active'
1505 "#,
1506 )
1507 .fetch_one(&self.pool)
1508 .await?;
1509
1510 Ok(stats)
1511 }
1512}
1513
1514#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize, serde::Deserialize)]
1515pub struct MemoryStatistics {
1516 pub working_count: Option<i64>,
1517 pub warm_count: Option<i64>,
1518 pub cold_count: Option<i64>,
1519 pub total_active: Option<i64>,
1520 pub total_deleted: Option<i64>,
1521 pub avg_importance: Option<f64>,
1522 pub max_access_count: Option<i32>,
1523 pub avg_access_count: Option<f64>,
1524}
1525
1526#[derive(Debug, Clone, sqlx::FromRow, serde::Serialize, serde::Deserialize)]
1527pub struct SimpleConsolidationStats {
1528 pub migration_candidates: Option<i64>,
1529 pub highly_consolidated: Option<i64>,
1530 pub avg_consolidation_strength: Option<f64>,
1531 pub avg_recall_probability: Option<f64>,
1532 pub recently_accessed: Option<i64>,
1533 pub total_active_memories: Option<i64>,
1534}
1535
1536#[cfg(test)]
1537mod tests {
1538 use super::*;
1539
1540 #[test]
1541 fn test_content_hash_generation() {
1542 let content = "This is a test memory content";
1543 let hash1 = Memory::calculate_content_hash(content);
1544 let hash2 = Memory::calculate_content_hash(content);
1545
1546 assert_eq!(hash1, hash2);
1547 assert_eq!(hash1.len(), 64); }
1549
1550 #[test]
1551 fn test_should_migrate() {
1552 let mut memory = Memory::default();
1553
1554 memory.tier = MemoryTier::Working;
1556 memory.importance_score = 0.01;
1557 memory.consolidation_strength = 0.1;
1558 memory.access_count = 0;
1559 memory.last_accessed_at = Some(Utc::now() - chrono::Duration::days(30)); assert!(memory.should_migrate());
1561
1562 memory.importance_score = 0.9;
1564 memory.consolidation_strength = 8.0;
1565 memory.access_count = 100;
1566 memory.last_accessed_at = Some(Utc::now()); assert!(!memory.should_migrate());
1568
1569 memory.tier = MemoryTier::Cold;
1572 memory.importance_score = 0.0;
1573 memory.last_accessed_at = Some(Utc::now() - chrono::Duration::days(30)); memory.tier = MemoryTier::Frozen;
1579 assert!(!memory.should_migrate());
1580 }
1581
1582 #[test]
1583 fn test_next_tier() {
1584 let mut memory = Memory::default();
1585
1586 memory.tier = MemoryTier::Working;
1587 assert_eq!(memory.next_tier(), Some(MemoryTier::Warm));
1588
1589 memory.tier = MemoryTier::Warm;
1590 assert_eq!(memory.next_tier(), Some(MemoryTier::Cold));
1591
1592 memory.tier = MemoryTier::Cold;
1593 assert_eq!(memory.next_tier(), Some(MemoryTier::Frozen));
1594
1595 memory.tier = MemoryTier::Frozen;
1596 assert_eq!(memory.next_tier(), None);
1597 }
1598}