Skip to main content

cognee_database/sync/
repository.rs

1//! `SyncOperationRepository` trait + DTO row.
2//!
3//! Mirrors Python's `cognee/modules/sync/methods/` module 1:1.
4
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use uuid::Uuid;
8
9use crate::types::DatabaseError;
10
11/// Status enum used by the repository surface. String values match Python's
12/// JSON column verbatim.
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum SyncOperationStatus {
15    Started,
16    InProgress,
17    Completed,
18    Failed,
19    Cancelled,
20}
21
22impl SyncOperationStatus {
23    /// Wire/DB string for this status.
24    pub fn as_str(&self) -> &'static str {
25        match self {
26            Self::Started => "started",
27            Self::InProgress => "in_progress",
28            Self::Completed => "completed",
29            Self::Failed => "failed",
30            Self::Cancelled => "cancelled",
31        }
32    }
33}
34
35/// Snapshot of one `sync_operations` row (every column).
36#[derive(Debug, Clone)]
37pub struct SyncOperationRow {
38    pub id: Uuid,
39    pub run_id: String,
40    pub status: String,
41    pub progress_percentage: u32,
42    pub dataset_ids: Vec<Uuid>,
43    pub dataset_names: Vec<String>,
44    pub user_id: Uuid,
45    pub created_at: DateTime<Utc>,
46    pub started_at: Option<DateTime<Utc>>,
47    pub completed_at: Option<DateTime<Utc>>,
48    pub total_records_to_sync: Option<i32>,
49    pub total_records_to_download: Option<i32>,
50    pub total_records_to_upload: Option<i32>,
51    pub records_downloaded: i32,
52    pub records_uploaded: i32,
53    pub bytes_downloaded: i64,
54    pub bytes_uploaded: i64,
55    pub dataset_sync_hashes: Option<serde_json::Value>,
56    pub error_message: Option<String>,
57    pub retry_count: i32,
58}
59
60/// Persistence trait for the cloud sync router.
61#[async_trait]
62pub trait SyncOperationRepository: Send + Sync + 'static {
63    /// Insert a new row in `started` state with progress = 0%.
64    async fn create_operation(
65        &self,
66        run_id: &str,
67        dataset_ids: &[Uuid],
68        dataset_names: &[String],
69        user_id: Uuid,
70    ) -> Result<(), DatabaseError>;
71
72    /// Transition a row to `in_progress`, setting `started_at = NOW()`.
73    async fn mark_started(&self, run_id: &str) -> Result<(), DatabaseError>;
74
75    /// Transition a row to `completed`, set `completed_at = NOW()`,
76    /// progress = 100. Optional totals get persisted alongside.
77    async fn mark_completed(
78        &self,
79        run_id: &str,
80        records_uploaded: i32,
81        records_downloaded: i32,
82        bytes_uploaded: i64,
83        bytes_downloaded: i64,
84        dataset_sync_hashes: Option<serde_json::Value>,
85    ) -> Result<(), DatabaseError>;
86
87    /// Transition a row to `failed`, set `completed_at = NOW()`, copy the
88    /// error message into `error_message`.
89    async fn mark_failed(&self, run_id: &str, error_message: &str) -> Result<(), DatabaseError>;
90
91    /// Update progress only (for the background task's tick callback).
92    async fn update_progress(&self, run_id: &str, percent: u32) -> Result<(), DatabaseError>;
93
94    /// All rows for `user_id` with status in `('started', 'in_progress')`,
95    /// ordered DESC by `created_at`.
96    async fn running_for_user(&self, user_id: Uuid)
97    -> Result<Vec<SyncOperationRow>, DatabaseError>;
98
99    /// Look up one row by its `run_id`.
100    async fn get_by_run_id(&self, run_id: &str) -> Result<Option<SyncOperationRow>, DatabaseError>;
101}