Skip to main content

codetether_agent/okr/
persistence.rs

1//! OKR Persistence Layer
2//!
3//! Provides file-based persistence for OKR entities using Config::data_dir().
4//! Supports CRUD operations for ID, run, and status queries.
5
6use crate::okr::{Okr, OkrRun, OkrRunStatus, OkrStatus};
7use anyhow::{Context, Result};
8use std::path::PathBuf;
9use std::sync::Arc;
10use tokio::fs;
11use tokio::sync::RwLock;
12use uuid::Uuid;
13
14/// Repository for OKR persistence operations
15pub struct OkrRepository {
16    base_path: PathBuf,
17    cache: Arc<RwLock<OkrCache>>,
18}
19
20/// In-memory cache for OKR data
21#[derive(Debug, Default)]
22struct OkrCache {
23    okrs: Vec<Okr>,
24    runs: Vec<OkrRun>,
25}
26
27impl OkrRepository {
28    /// Create a new OKR repository with the given base path
29    pub fn new(base_path: PathBuf) -> Self {
30        let repo = Self {
31            base_path,
32            cache: Arc::new(RwLock::new(OkrCache::default())),
33        };
34        repo
35    }
36
37    /// Create repository from Config::data_dir() or fallback to temp
38    pub async fn from_config() -> Result<Self> {
39        let base_path = crate::config::Config::data_dir()
40            .map(|p| p.join("okr"))
41            .unwrap_or_else(|| std::env::temp_dir().join("codetether").join("okr"));
42
43        // Ensure directory exists
44        fs::create_dir_all(&base_path)
45            .await
46            .context("Failed to create OKR data directory")?;
47
48        tracing::info!(path = %base_path.display(), "OKR repository initialized");
49
50        Ok(Self::new(base_path))
51    }
52
53    /// Get the file path for an OKR
54    fn okr_path(&self, id: Uuid) -> PathBuf {
55        self.base_path
56            .join("objectives")
57            .join(format!("{}.json", id))
58    }
59
60    /// Get the file path for an OKR run
61    fn run_path(&self, id: Uuid) -> PathBuf {
62        self.base_path.join("runs").join(format!("{}.json", id))
63    }
64
65    /// Get the directory path for OKRs
66    fn okrs_dir(&self) -> PathBuf {
67        self.base_path.join("objectives")
68    }
69
70    /// Get the directory path for runs
71    fn runs_dir(&self) -> PathBuf {
72        self.base_path.join("runs")
73    }
74
75    // ============ OKR CRUD ============
76
77    /// Create a new OKR
78    pub async fn create_okr(&self, okr: Okr) -> Result<Okr> {
79        // Validate
80        okr.validate()?;
81
82        // Ensure directory exists
83        fs::create_dir_all(self.okrs_dir()).await?;
84
85        // Write to file (atomic: tmp + rename)
86        let path = self.okr_path(okr.id);
87        let tmp_path = path.with_extension("json.tmp");
88        let json = serde_json::to_string_pretty(&okr)?;
89        fs::write(&tmp_path, &json).await?;
90        fs::rename(&tmp_path, &path).await?;
91
92        // Update cache
93        let mut cache = self.cache.write().await;
94        cache.okrs.push(okr.clone());
95
96        tracing::info!(okr_id = %okr.id, title = %okr.title, "OKR created");
97        Ok(okr)
98    }
99
100    /// Get an OKR by ID
101    pub async fn get_okr(&self, id: Uuid) -> Result<Option<Okr>> {
102        // Check cache first
103        {
104            let cache = self.cache.read().await;
105            if let Some(okr) = cache.okrs.iter().find(|o| o.id == id) {
106                return Ok(Some(okr.clone()));
107            }
108        }
109
110        // Load from disk
111        let path = self.okr_path(id);
112        if !path.exists() {
113            return Ok(None);
114        }
115
116        let data = fs::read_to_string(&path).await?;
117        let okr: Okr = serde_json::from_str(&data).context("Failed to parse OKR")?;
118
119        // Update cache
120        let mut cache = self.cache.write().await;
121        if !cache.okrs.iter().any(|o| o.id == okr.id) {
122            cache.okrs.push(okr.clone());
123        }
124
125        Ok(Some(okr))
126    }
127
128    /// Update an existing OKR
129    pub async fn update_okr(&self, mut okr: Okr) -> Result<Okr> {
130        okr.updated_at = chrono::Utc::now();
131
132        // Validate
133        okr.validate()?;
134
135        // Write to file
136        let path = self.okr_path(okr.id);
137        fs::create_dir_all(self.okrs_dir()).await?;
138        let json = serde_json::to_string_pretty(&okr)?;
139        fs::write(&path, &json).await?;
140
141        // Update cache
142        let mut cache = self.cache.write().await;
143        if let Some(existing) = cache.okrs.iter_mut().find(|o| o.id == okr.id) {
144            *existing = okr.clone();
145        }
146
147        tracing::info!(okr_id = %okr.id, "OKR updated");
148        Ok(okr)
149    }
150
151    /// Delete an OKR by ID
152    pub async fn delete_okr(&self, id: Uuid) -> Result<bool> {
153        let path = self.okr_path(id);
154
155        if !path.exists() {
156            return Ok(false);
157        }
158
159        fs::remove_file(&path).await?;
160
161        // Remove from cache
162        let mut cache = self.cache.write().await;
163        cache.okrs.retain(|o| o.id != id);
164
165        tracing::info!(okr_id = %id, "OKR deleted");
166        Ok(true)
167    }
168
169    /// List all OKRs
170    pub async fn list_okrs(&self) -> Result<Vec<Okr>> {
171        // Check cache first
172        {
173            let cache = self.cache.read().await;
174            if !cache.okrs.is_empty() {
175                return Ok(cache.okrs.clone());
176            }
177        }
178
179        // Load from disk
180        let dir = self.okrs_dir();
181        if !dir.exists() {
182            return Ok(Vec::new());
183        }
184
185        let mut okrs = Vec::new();
186        let mut entries = fs::read_dir(&dir).await?;
187
188        while let Some(entry) = entries.next_entry().await? {
189            let path = entry.path();
190            if path.extension().map_or(false, |e| e == "json") {
191                if let Ok(data) = fs::read_to_string(&path).await {
192                    if let Ok(okr) = serde_json::from_str::<Okr>(&data) {
193                        okrs.push(okr);
194                    }
195                }
196            }
197        }
198
199        // Update cache
200        let mut cache = self.cache.write().await;
201        cache.okrs = okrs.clone();
202
203        Ok(okrs)
204    }
205
206    /// Query OKRs by status
207    pub async fn query_okrs_by_status(&self, status: OkrStatus) -> Result<Vec<Okr>> {
208        let all = self.list_okrs().await?;
209        Ok(all.into_iter().filter(|o| o.status == status).collect())
210    }
211
212    /// Query OKRs by owner
213    pub async fn query_okrs_by_owner(&self, owner: &str) -> Result<Vec<Okr>> {
214        let all = self.list_okrs().await?;
215        Ok(all
216            .into_iter()
217            .filter(|o| o.owner.as_deref() == Some(owner))
218            .collect())
219    }
220
221    /// Query OKRs by tenant
222    pub async fn query_okrs_by_tenant(&self, tenant_id: &str) -> Result<Vec<Okr>> {
223        let all = self.list_okrs().await?;
224        Ok(all
225            .into_iter()
226            .filter(|o| o.tenant_id.as_deref() == Some(tenant_id))
227            .collect())
228    }
229
230    // ============ OKR Run CRUD ============
231
232    /// Create a new OKR run
233    pub async fn create_run(&self, run: OkrRun) -> Result<OkrRun> {
234        // Validate
235        run.validate()?;
236
237        // Ensure directory exists
238        fs::create_dir_all(self.runs_dir()).await?;
239
240        // Write to file
241        let path = self.run_path(run.id);
242        let json = serde_json::to_string_pretty(&run)?;
243        fs::write(&path, &json).await?;
244
245        // Update cache
246        let mut cache = self.cache.write().await;
247        cache.runs.push(run.clone());
248
249        tracing::info!(run_id = %run.id, okr_id = %run.okr_id, name = %run.name, "OKR run created");
250        Ok(run)
251    }
252
253    /// Get a run by ID
254    pub async fn get_run(&self, id: Uuid) -> Result<Option<OkrRun>> {
255        // Check cache
256        {
257            let cache = self.cache.read().await;
258            if let Some(run) = cache.runs.iter().find(|r| r.id == id) {
259                return Ok(Some(run.clone()));
260            }
261        }
262
263        // Load from disk
264        let path = self.run_path(id);
265        if !path.exists() {
266            return Ok(None);
267        }
268
269        let data = fs::read_to_string(&path).await?;
270        let run: OkrRun = serde_json::from_str(&data).context("Failed to parse OKR run")?;
271
272        // Update cache
273        let mut cache = self.cache.write().await;
274        if !cache.runs.iter().any(|r| r.id == run.id) {
275            cache.runs.push(run.clone());
276        }
277
278        Ok(Some(run))
279    }
280
281    /// Update a run
282    pub async fn update_run(&self, mut run: OkrRun) -> Result<OkrRun> {
283        run.updated_at = chrono::Utc::now();
284
285        // Write to file
286        let path = self.run_path(run.id);
287        fs::create_dir_all(self.runs_dir()).await?;
288        let json = serde_json::to_string_pretty(&run)?;
289        fs::write(&path, &json).await?;
290
291        // Update cache
292        let mut cache = self.cache.write().await;
293        if let Some(existing) = cache.runs.iter_mut().find(|r| r.id == run.id) {
294            *existing = run.clone();
295        }
296
297        tracing::info!(run_id = %run.id, "OKR run updated");
298        Ok(run)
299    }
300
301    /// Delete a run
302    pub async fn delete_run(&self, id: Uuid) -> Result<bool> {
303        let path = self.run_path(id);
304
305        if !path.exists() {
306            return Ok(false);
307        }
308
309        fs::remove_file(&path).await?;
310
311        // Remove from cache
312        let mut cache = self.cache.write().await;
313        cache.runs.retain(|r| r.id != id);
314
315        tracing::info!(run_id = %id, "OKR run deleted");
316        Ok(true)
317    }
318
319    /// List all runs
320    pub async fn list_runs(&self) -> Result<Vec<OkrRun>> {
321        // Check cache
322        {
323            let cache = self.cache.read().await;
324            if !cache.runs.is_empty() {
325                return Ok(cache.runs.clone());
326            }
327        }
328
329        // Load from disk
330        let dir = self.runs_dir();
331        if !dir.exists() {
332            return Ok(Vec::new());
333        }
334
335        let mut runs = Vec::new();
336        let mut entries = fs::read_dir(&dir).await?;
337
338        while let Some(entry) = entries.next_entry().await? {
339            let path = entry.path();
340            if path.extension().map_or(false, |e| e == "json") {
341                if let Ok(data) = fs::read_to_string(&path).await {
342                    if let Ok(run) = serde_json::from_str::<OkrRun>(&data) {
343                        runs.push(run);
344                    }
345                }
346            }
347        }
348
349        // Update cache
350        let mut cache = self.cache.write().await;
351        cache.runs = runs.clone();
352
353        Ok(runs)
354    }
355
356    /// Query runs by OKR ID
357    pub async fn query_runs_by_okr(&self, okr_id: Uuid) -> Result<Vec<OkrRun>> {
358        let all = self.list_runs().await?;
359        Ok(all.into_iter().filter(|r| r.okr_id == okr_id).collect())
360    }
361
362    /// Query runs by status
363    pub async fn query_runs_by_status(&self, status: OkrRunStatus) -> Result<Vec<OkrRun>> {
364        let all = self.list_runs().await?;
365        Ok(all.into_iter().filter(|r| r.status == status).collect())
366    }
367
368    /// Query runs by correlation ID
369    pub async fn query_runs_by_correlation(&self, correlation_id: &str) -> Result<Vec<OkrRun>> {
370        let all = self.list_runs().await?;
371        Ok(all
372            .into_iter()
373            .filter(|r| r.correlation_id.as_deref() == Some(correlation_id))
374            .collect())
375    }
376
377    /// Query runs by relay checkpoint ID
378    pub async fn query_runs_by_checkpoint(&self, checkpoint_id: &str) -> Result<Vec<OkrRun>> {
379        let all = self.list_runs().await?;
380        Ok(all
381            .into_iter()
382            .filter(|r| r.relay_checkpoint_id.as_deref() == Some(checkpoint_id))
383            .collect())
384    }
385
386    /// Query runs by session ID
387    pub async fn query_runs_by_session(&self, session_id: &str) -> Result<Vec<OkrRun>> {
388        let all = self.list_runs().await?;
389        Ok(all
390            .into_iter()
391            .filter(|r| r.session_id.as_deref() == Some(session_id))
392            .collect())
393    }
394
395    // ============ Utility Methods ============
396
397    /// Clear the in-memory cache (useful for forcing reload from disk)
398    pub async fn clear_cache(&self) {
399        let mut cache = self.cache.write().await;
400        cache.okrs.clear();
401        cache.runs.clear();
402        tracing::debug!("OKR repository cache cleared");
403    }
404
405    /// Get statistics about the repository
406    pub async fn stats(&self) -> Result<OkrRepositoryStats> {
407        let okrs = self.list_okrs().await?;
408        let runs = self.list_runs().await?;
409
410        let okr_status_counts = okrs
411            .iter()
412            .fold(std::collections::HashMap::new(), |mut acc, o| {
413                *acc.entry(o.status).or_insert(0) += 1;
414                acc
415            });
416
417        let run_status_counts = runs
418            .iter()
419            .fold(std::collections::HashMap::new(), |mut acc, r| {
420                *acc.entry(r.status).or_insert(0) += 1;
421                acc
422            });
423
424        Ok(OkrRepositoryStats {
425            total_okrs: okrs.len(),
426            total_runs: runs.len(),
427            okr_status_counts,
428            run_status_counts,
429        })
430    }
431}
432
433/// Statistics about the OKR repository
434#[derive(Debug, serde::Serialize)]
435pub struct OkrRepositoryStats {
436    pub total_okrs: usize,
437    pub total_runs: usize,
438    pub okr_status_counts: std::collections::HashMap<OkrStatus, usize>,
439    pub run_status_counts: std::collections::HashMap<OkrRunStatus, usize>,
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445    use crate::okr::KeyResult;
446
447    fn temp_repo() -> OkrRepository {
448        let temp_dir = std::env::temp_dir().join(format!("okr_test_{}", Uuid::new_v4()));
449        OkrRepository::new(temp_dir)
450    }
451
452    #[tokio::test]
453    async fn test_create_and_get_okr() {
454        let repo = temp_repo();
455
456        let mut okr = Okr::new("Test Objective", "Description");
457        let kr = KeyResult::new(okr.id, "KR1", 100.0, "%");
458        okr.add_key_result(kr);
459
460        let created = repo.create_okr(okr).await.unwrap();
461
462        let fetched = repo.get_okr(created.id).await.unwrap();
463        assert!(fetched.is_some());
464        assert_eq!(fetched.unwrap().title, "Test Objective");
465    }
466
467    #[tokio::test]
468    async fn test_update_okr() {
469        let repo = temp_repo();
470
471        let mut okr = Okr::new("Test", "Desc");
472        let kr = KeyResult::new(okr.id, "KR1", 100.0, "%");
473        okr.add_key_result(kr);
474
475        let created = repo.create_okr(okr).await.unwrap();
476
477        let mut to_update = created.clone();
478        to_update.title = "Updated Title".to_string();
479        to_update.status = OkrStatus::Active;
480
481        let updated = repo.update_okr(to_update).await.unwrap();
482        assert_eq!(updated.title, "Updated Title");
483        assert_eq!(updated.status, OkrStatus::Active);
484    }
485
486    #[tokio::test]
487    async fn test_delete_okr() {
488        let repo = temp_repo();
489
490        let mut okr = Okr::new("Test", "Desc");
491        let kr = KeyResult::new(okr.id, "KR1", 100.0, "%");
492        okr.add_key_result(kr);
493
494        let created = repo.create_okr(okr).await.unwrap();
495        let deleted = repo.delete_okr(created.id).await.unwrap();
496        assert!(deleted);
497
498        let fetched = repo.get_okr(created.id).await.unwrap();
499        assert!(fetched.is_none());
500    }
501
502    #[tokio::test]
503    async fn test_query_okrs_by_status() {
504        let repo = temp_repo();
505
506        let mut okr1 = Okr::new("Active OKR", "Desc");
507        let kr1 = KeyResult::new(okr1.id, "KR1", 100.0, "%");
508        okr1.add_key_result(kr1);
509        okr1.status = OkrStatus::Active;
510
511        let mut okr2 = Okr::new("Draft OKR", "Desc");
512        let kr2 = KeyResult::new(okr2.id, "KR2", 100.0, "%");
513        okr2.add_key_result(kr2);
514        okr2.status = OkrStatus::Draft;
515
516        repo.create_okr(okr1).await.unwrap();
517        repo.create_okr(okr2).await.unwrap();
518
519        let active = repo.query_okrs_by_status(OkrStatus::Active).await.unwrap();
520        assert_eq!(active.len(), 1);
521        assert_eq!(active[0].title, "Active OKR");
522    }
523
524    #[tokio::test]
525    async fn test_crud_runs() {
526        let repo = temp_repo();
527
528        // First create an OKR
529        let mut okr = Okr::new("Test OKR", "Desc");
530        let okr_id = okr.id;
531        let kr = KeyResult::new(okr.id, "KR1", 100.0, "%");
532        okr.add_key_result(kr);
533        repo.create_okr(okr).await.unwrap();
534
535        // Create a run
536        let mut run = OkrRun::new(okr_id, "Q1 Run");
537        run.correlation_id = Some("corr-123".to_string());
538        run.session_id = Some("session-456".to_string());
539
540        let created = repo.create_run(run).await.unwrap();
541
542        // Fetch by ID
543        let fetched = repo.get_run(created.id).await.unwrap();
544        assert!(fetched.is_some());
545        assert_eq!(fetched.unwrap().name, "Q1 Run");
546
547        // Query by OKR
548        let runs_for_okr = repo.query_runs_by_okr(okr_id).await.unwrap();
549        assert_eq!(runs_for_okr.len(), 1);
550
551        // Query by correlation
552        let runs_by_corr = repo.query_runs_by_correlation("corr-123").await.unwrap();
553        assert_eq!(runs_by_corr.len(), 1);
554
555        // Query by status
556        let pending = repo
557            .query_runs_by_status(OkrRunStatus::Draft)
558            .await
559            .unwrap();
560        assert_eq!(pending.len(), 1);
561    }
562
563    #[tokio::test]
564    async fn test_list_and_stats() {
565        let repo = temp_repo();
566
567        // Create OKRs
568        let mut okr = Okr::new("Test", "Desc");
569        let kr = KeyResult::new(okr.id, "KR", 100.0, "%");
570        okr.add_key_result(kr);
571        repo.create_okr(okr).await.unwrap();
572
573        // List
574        let all_okrs = repo.list_okrs().await.unwrap();
575        assert_eq!(all_okrs.len(), 1);
576
577        let all_runs = repo.list_runs().await.unwrap();
578        assert_eq!(all_runs.len(), 0);
579
580        // Stats
581        let stats = repo.stats().await.unwrap();
582        assert_eq!(stats.total_okrs, 1);
583    }
584}