Skip to main content

flow_db/
feature.rs

1use flow_core::{
2    CreateFeatureInput, DependencyRef, Feature, FeatureGraphNode, FeatureStats, Result,
3};
4use rusqlite::{Connection, OptionalExtension, Row};
5use std::collections::{HashMap, HashSet};
6
7pub struct FeatureStore;
8
9impl FeatureStore {
10    /// Create a new feature.
11    pub fn create(conn: &Connection, input: &CreateFeatureInput) -> Result<Feature> {
12        let priority = input.priority.map_or_else(
13            || {
14                // Auto-assign priority as MAX+1
15                let max_priority: Option<i32> = conn
16                    .query_row("SELECT MAX(priority) FROM features", [], |row| row.get(0))
17                    .ok()
18                    .flatten();
19                max_priority.unwrap_or(0) + 1
20            },
21            |p| p,
22        );
23
24        // Validate dependencies exist
25        for dep_ref in &input.dependencies {
26            if let DependencyRef::Id(dep_id) = dep_ref {
27                let exists: bool = conn
28                    .query_row(
29                        "SELECT EXISTS(SELECT 1 FROM features WHERE id = ?)",
30                        [dep_id],
31                        |row| row.get(0),
32                    )
33                    .map_err(|e| {
34                        flow_core::FlowError::Database(format!("dependency check failed: {e}"))
35                    })?;
36                if !exists {
37                    return Err(flow_core::FlowError::NotFound(format!(
38                        "dependency {dep_id} not found"
39                    )));
40                }
41            }
42        }
43
44        let deps_json = serde_json::to_string(
45            &input
46                .dependencies
47                .iter()
48                .filter_map(|d| match d {
49                    DependencyRef::Id(id) => Some(*id),
50                    DependencyRef::Index { .. } => None,
51                })
52                .collect::<Vec<_>>(),
53        )?;
54        let steps_json = serde_json::to_string(&input.steps)?;
55
56        conn.execute(
57            r"
58            INSERT INTO features (name, description, priority, category, steps, dependencies)
59            VALUES (?, ?, ?, ?, ?, ?)
60            ",
61            rusqlite::params![
62                &input.name,
63                &input.description,
64                priority,
65                &input.category,
66                steps_json,
67                deps_json,
68            ],
69        )
70        .map_err(|e| flow_core::FlowError::Database(format!("insert failed: {e}")))?;
71
72        let id = conn.last_insert_rowid();
73        Self::get_by_id(conn, id)?.ok_or_else(|| {
74            flow_core::FlowError::Database("feature not found after insert".to_string())
75        })
76    }
77
78    /// Create multiple features in a single transaction, resolving index-based dependencies.
79    #[allow(clippy::too_many_lines)]
80    pub fn create_bulk(conn: &Connection, inputs: &[CreateFeatureInput]) -> Result<Vec<Feature>> {
81        if inputs.is_empty() {
82            return Ok(vec![]);
83        }
84
85        // Validate all inputs first (before starting transaction)
86        for (idx, input) in inputs.iter().enumerate() {
87            for dep_ref in &input.dependencies {
88                match dep_ref {
89                    DependencyRef::Id(dep_id) => {
90                        let exists: bool = conn
91                            .query_row(
92                                "SELECT EXISTS(SELECT 1 FROM features WHERE id = ?)",
93                                [dep_id],
94                                |row| row.get(0),
95                            )
96                            .map_err(|e| {
97                                flow_core::FlowError::Database(format!(
98                                    "dependency check failed: {e}"
99                                ))
100                            })?;
101                        if !exists {
102                            return Err(flow_core::FlowError::NotFound(format!(
103                                "dependency {dep_id} not found for feature at index {idx}"
104                            )));
105                        }
106                    }
107                    DependencyRef::Index { index } => {
108                        if *index >= inputs.len() {
109                            return Err(flow_core::FlowError::BadRequest(format!(
110                                "index {index} out of bounds for feature at index {idx}"
111                            )));
112                        }
113                        // Reject self-references
114                        if *index == idx {
115                            return Err(flow_core::FlowError::BadRequest(format!(
116                                "feature at index {idx} cannot depend on itself"
117                            )));
118                        }
119                    }
120                }
121            }
122        }
123
124        // Begin transaction for atomicity
125        conn.execute_batch("BEGIN IMMEDIATE").map_err(|e| {
126            flow_core::FlowError::Database(format!("begin transaction failed: {e}"))
127        })?;
128
129        let result = (|| -> Result<Vec<Feature>> {
130            // Get starting priority
131            let max_priority: Option<i32> = conn
132                .query_row("SELECT MAX(priority) FROM features", [], |row| row.get(0))
133                .ok()
134                .flatten();
135            let mut next_priority = max_priority.unwrap_or(0) + 1;
136
137            // Insert all features, tracking their IDs
138            let mut feature_ids = Vec::new();
139            for input in inputs {
140                let priority = input.priority.unwrap_or_else(|| {
141                    let p = next_priority;
142                    next_priority += 1;
143                    p
144                });
145
146                let steps_json = serde_json::to_string(&input.steps)?;
147                let deps_json = "[]"; // Temporarily empty, will update after
148
149                conn.execute(
150                    r"
151                    INSERT INTO features (name, description, priority, category, steps, dependencies)
152                    VALUES (?, ?, ?, ?, ?, ?)
153                    ",
154                    rusqlite::params![
155                        &input.name,
156                        &input.description,
157                        priority,
158                        &input.category,
159                        steps_json,
160                        deps_json,
161                    ],
162                )
163                .map_err(|e| flow_core::FlowError::Database(format!("bulk insert failed: {e}")))?;
164
165                feature_ids.push(conn.last_insert_rowid());
166            }
167
168            // Resolve index-based dependencies to real IDs and update
169            for (idx, input) in inputs.iter().enumerate() {
170                let feature_id = feature_ids[idx];
171                let mut resolved_deps = Vec::new();
172
173                for dep_ref in &input.dependencies {
174                    match dep_ref {
175                        DependencyRef::Id(id) => resolved_deps.push(*id),
176                        DependencyRef::Index { index } => resolved_deps.push(feature_ids[*index]),
177                    }
178                }
179
180                if !resolved_deps.is_empty() {
181                    let deps_json = serde_json::to_string(&resolved_deps)?;
182                    conn.execute(
183                        "UPDATE features SET dependencies = ? WHERE id = ?",
184                        rusqlite::params![deps_json, feature_id],
185                    )
186                    .map_err(|e| {
187                        flow_core::FlowError::Database(format!("dependency update failed: {e}"))
188                    })?;
189                }
190            }
191
192            // Fetch and return all created features
193            feature_ids
194                .into_iter()
195                .map(|id| {
196                    Self::get_by_id(conn, id)?.ok_or_else(|| {
197                        flow_core::FlowError::Database("feature disappeared".to_string())
198                    })
199                })
200                .collect()
201        })();
202
203        match result {
204            Ok(features) => {
205                conn.execute_batch("COMMIT")
206                    .map_err(|e| flow_core::FlowError::Database(format!("commit failed: {e}")))?;
207                Ok(features)
208            }
209            Err(e) => {
210                let _ = conn.execute_batch("ROLLBACK");
211                Err(e)
212            }
213        }
214    }
215
216    /// Get a feature by ID.
217    pub fn get_by_id(conn: &Connection, id: i64) -> Result<Option<Feature>> {
218        let result = conn
219            .query_row("SELECT * FROM features WHERE id = ?", [id], row_to_feature)
220            .optional()
221            .map_err(|e| flow_core::FlowError::Database(format!("query failed: {e}")))?;
222        Ok(result)
223    }
224
225    /// Get all features.
226    pub fn get_all(conn: &Connection) -> Result<Vec<Feature>> {
227        let mut stmt = conn
228            .prepare("SELECT * FROM features ORDER BY priority ASC")
229            .map_err(|e| flow_core::FlowError::Database(format!("prepare failed: {e}")))?;
230
231        let features = stmt
232            .query_map([], row_to_feature)
233            .map_err(|e| flow_core::FlowError::Database(format!("query failed: {e}")))?
234            .collect::<std::result::Result<Vec<_>, _>>()
235            .map_err(|e| flow_core::FlowError::Database(format!("row parse failed: {e}")))?;
236
237        Ok(features)
238    }
239
240    /// Get feature statistics.
241    pub fn get_stats(conn: &Connection) -> Result<FeatureStats> {
242        let (total, passing, in_progress): (usize, usize, usize) = conn
243            .query_row(
244                r"
245                SELECT
246                    COUNT(*),
247                    SUM(CASE WHEN passes = 1 THEN 1 ELSE 0 END),
248                    SUM(CASE WHEN in_progress = 1 THEN 1 ELSE 0 END)
249                FROM features
250                ",
251                [],
252                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
253            )
254            .map_err(|e| flow_core::FlowError::Database(format!("stats query failed: {e}")))?;
255
256        let failing = total.saturating_sub(passing).saturating_sub(in_progress);
257
258        // Count blocked features (those with at least one failing dependency)
259        let all_features = Self::get_all(conn)?;
260        let blocked = all_features
261            .iter()
262            .filter(|f| {
263                f.dependencies.iter().any(|dep_id| {
264                    all_features
265                        .iter()
266                        .find(|d| d.id == *dep_id)
267                        .is_some_and(|d| !d.passes)
268                })
269            })
270            .count();
271
272        Ok(FeatureStats {
273            total,
274            passing,
275            failing,
276            in_progress,
277            blocked,
278        })
279    }
280
281    /// Get features that are ready to work on (not in progress, not passing, all deps pass).
282    pub fn get_ready(conn: &Connection) -> Result<Vec<Feature>> {
283        let all_features = Self::get_all(conn)?;
284        let passing_ids: HashSet<i64> = all_features
285            .iter()
286            .filter(|f| f.passes)
287            .map(|f| f.id)
288            .collect();
289
290        let ready: Vec<Feature> = all_features
291            .into_iter()
292            .filter(|f| {
293                !f.in_progress
294                    && !f.passes
295                    && f.dependencies
296                        .iter()
297                        .all(|dep_id| passing_ids.contains(dep_id))
298            })
299            .collect();
300
301        Ok(ready)
302    }
303
304    /// Get features that are blocked by failing dependencies.
305    pub fn get_blocked(conn: &Connection) -> Result<Vec<Feature>> {
306        let all_features = Self::get_all(conn)?;
307        let passing_ids: HashSet<i64> = all_features
308            .iter()
309            .filter(|f| f.passes)
310            .map(|f| f.id)
311            .collect();
312
313        let blocked: Vec<Feature> = all_features
314            .into_iter()
315            .filter(|f| {
316                !f.dependencies.is_empty()
317                    && f.dependencies
318                        .iter()
319                        .any(|dep_id| !passing_ids.contains(dep_id))
320            })
321            .collect();
322
323        Ok(blocked)
324    }
325
326    /// Atomically claim a feature and mark it in progress.
327    pub fn claim_and_get(conn: &Connection, id: i64) -> Result<Feature> {
328        let rows_affected = conn
329            .execute(
330                r"
331                UPDATE features
332                SET in_progress = 1, updated_at = datetime('now')
333                WHERE id = ? AND in_progress = 0 AND passes = 0
334                ",
335                [id],
336            )
337            .map_err(|e| flow_core::FlowError::Database(format!("claim failed: {e}")))?;
338
339        if rows_affected == 0 {
340            return Err(flow_core::FlowError::Conflict(format!(
341                "feature {id} is already claimed or completed"
342            )));
343        }
344
345        Self::get_by_id(conn, id)?
346            .ok_or_else(|| flow_core::FlowError::NotFound(format!("feature {id} not found")))
347    }
348
349    /// Mark a feature as passing.
350    pub fn mark_passing(conn: &Connection, id: i64) -> Result<()> {
351        let rows = conn
352            .execute(
353                "UPDATE features SET passes = 1, in_progress = 0, updated_at = datetime('now') WHERE id = ?",
354                [id],
355            )
356            .map_err(|e| flow_core::FlowError::Database(format!("update failed: {e}")))?;
357        if rows == 0 {
358            return Err(flow_core::FlowError::NotFound(format!(
359                "feature {id} not found"
360            )));
361        }
362        Ok(())
363    }
364
365    /// Mark a feature as failing.
366    pub fn mark_failing(conn: &Connection, id: i64) -> Result<()> {
367        let rows = conn
368            .execute(
369                "UPDATE features SET passes = 0, in_progress = 0, updated_at = datetime('now') WHERE id = ?",
370                [id],
371            )
372            .map_err(|e| flow_core::FlowError::Database(format!("update failed: {e}")))?;
373        if rows == 0 {
374            return Err(flow_core::FlowError::NotFound(format!(
375                "feature {id} not found"
376            )));
377        }
378        Ok(())
379    }
380
381    /// Mark a feature as in progress (atomic).
382    pub fn mark_in_progress(conn: &Connection, id: i64) -> Result<()> {
383        let rows = conn
384            .execute(
385                "UPDATE features SET in_progress = 1, updated_at = datetime('now') WHERE id = ? AND in_progress = 0",
386                [id],
387            )
388            .map_err(|e| flow_core::FlowError::Database(format!("update failed: {e}")))?;
389
390        if rows == 0 {
391            return Err(flow_core::FlowError::Conflict(format!(
392                "feature {id} is already in progress"
393            )));
394        }
395        Ok(())
396    }
397
398    /// Clear in-progress flag.
399    pub fn clear_in_progress(conn: &Connection, id: i64) -> Result<()> {
400        conn.execute(
401            "UPDATE features SET in_progress = 0, updated_at = datetime('now') WHERE id = ?",
402            [id],
403        )
404        .map_err(|e| flow_core::FlowError::Database(format!("update failed: {e}")))?;
405        Ok(())
406    }
407
408    /// Skip a feature by moving it to the end of the priority queue.
409    pub fn skip(conn: &Connection, id: i64) -> Result<()> {
410        let max_priority: Option<i32> = conn
411            .query_row("SELECT MAX(priority) FROM features", [], |row| row.get(0))
412            .ok()
413            .flatten();
414        let new_priority = max_priority.unwrap_or(0) + 1;
415
416        conn.execute(
417            "UPDATE features SET priority = ?, updated_at = datetime('now') WHERE id = ?",
418            rusqlite::params![new_priority, id],
419        )
420        .map_err(|e| flow_core::FlowError::Database(format!("skip failed: {e}")))?;
421        Ok(())
422    }
423
424    /// Add a dependency to a feature.
425    /// Uses SAVEPOINT for atomic read-modify-write to prevent TOCTOU races.
426    pub fn add_dependency(conn: &Connection, feature_id: i64, dep_id: i64) -> Result<()> {
427        conn.execute_batch("SAVEPOINT add_dep")
428            .map_err(|e| flow_core::FlowError::Database(format!("savepoint failed: {e}")))?;
429
430        let result = (|| -> Result<()> {
431            // Verify dependency exists
432            let exists: bool = conn
433                .query_row(
434                    "SELECT EXISTS(SELECT 1 FROM features WHERE id = ?)",
435                    [dep_id],
436                    |row| row.get(0),
437                )
438                .map_err(|e| flow_core::FlowError::Database(format!("exists check failed: {e}")))?;
439
440            if !exists {
441                return Err(flow_core::FlowError::NotFound(format!(
442                    "dependency {dep_id} not found"
443                )));
444            }
445
446            // Get current dependencies (within the savepoint for consistency)
447            let deps_json: String = conn
448                .query_row(
449                    "SELECT dependencies FROM features WHERE id = ?",
450                    [feature_id],
451                    |row| row.get(0),
452                )
453                .map_err(|e| flow_core::FlowError::Database(format!("query failed: {e}")))?;
454
455            let mut deps: Vec<i64> = serde_json::from_str(&deps_json)?;
456
457            if !deps.contains(&dep_id) {
458                deps.push(dep_id);
459                let new_deps_json = serde_json::to_string(&deps)?;
460                conn.execute(
461                    "UPDATE features SET dependencies = ?, updated_at = datetime('now') WHERE id = ?",
462                    rusqlite::params![new_deps_json, feature_id],
463                )
464                .map_err(|e| {
465                    flow_core::FlowError::Database(format!("dependency add failed: {e}"))
466                })?;
467            }
468
469            Ok(())
470        })();
471
472        match result {
473            Ok(()) => {
474                conn.execute_batch("RELEASE SAVEPOINT add_dep")
475                    .map_err(|e| flow_core::FlowError::Database(format!("release failed: {e}")))?;
476                Ok(())
477            }
478            Err(e) => {
479                let _ = conn.execute_batch("ROLLBACK TO SAVEPOINT add_dep");
480                Err(e)
481            }
482        }
483    }
484
485    /// Remove a dependency from a feature.
486    /// Uses SAVEPOINT for atomic read-modify-write to prevent TOCTOU races.
487    pub fn remove_dependency(conn: &Connection, feature_id: i64, dep_id: i64) -> Result<()> {
488        conn.execute_batch("SAVEPOINT rm_dep")
489            .map_err(|e| flow_core::FlowError::Database(format!("savepoint failed: {e}")))?;
490
491        let result = (|| -> Result<()> {
492            let deps_json: String = conn
493                .query_row(
494                    "SELECT dependencies FROM features WHERE id = ?",
495                    [feature_id],
496                    |row| row.get(0),
497                )
498                .map_err(|e| flow_core::FlowError::Database(format!("query failed: {e}")))?;
499
500            let mut deps: Vec<i64> = serde_json::from_str(&deps_json)?;
501            deps.retain(|&id| id != dep_id);
502
503            let new_deps_json = serde_json::to_string(&deps)?;
504            conn.execute(
505                "UPDATE features SET dependencies = ?, updated_at = datetime('now') WHERE id = ?",
506                rusqlite::params![new_deps_json, feature_id],
507            )
508            .map_err(|e| {
509                flow_core::FlowError::Database(format!("dependency remove failed: {e}"))
510            })?;
511
512            Ok(())
513        })();
514
515        match result {
516            Ok(()) => {
517                conn.execute_batch("RELEASE SAVEPOINT rm_dep")
518                    .map_err(|e| flow_core::FlowError::Database(format!("release failed: {e}")))?;
519                Ok(())
520            }
521            Err(e) => {
522                let _ = conn.execute_batch("ROLLBACK TO SAVEPOINT rm_dep");
523                Err(e)
524            }
525        }
526    }
527
528    /// Set all dependencies for a feature (replaces existing).
529    /// Uses SAVEPOINT for atomic validation + update.
530    pub fn set_dependencies(conn: &Connection, feature_id: i64, dep_ids: &[i64]) -> Result<()> {
531        conn.execute_batch("SAVEPOINT set_deps")
532            .map_err(|e| flow_core::FlowError::Database(format!("savepoint failed: {e}")))?;
533
534        let result = (|| -> Result<()> {
535            // Verify all dependencies exist
536            for dep_id in dep_ids {
537                let exists: bool = conn
538                    .query_row(
539                        "SELECT EXISTS(SELECT 1 FROM features WHERE id = ?)",
540                        [dep_id],
541                        |row| row.get(0),
542                    )
543                    .map_err(|e| {
544                        flow_core::FlowError::Database(format!("exists check failed: {e}"))
545                    })?;
546
547                if !exists {
548                    return Err(flow_core::FlowError::NotFound(format!(
549                        "dependency {dep_id} not found"
550                    )));
551                }
552            }
553
554            let deps_json = serde_json::to_string(dep_ids)?;
555            conn.execute(
556                "UPDATE features SET dependencies = ?, updated_at = datetime('now') WHERE id = ?",
557                rusqlite::params![deps_json, feature_id],
558            )
559            .map_err(|e| flow_core::FlowError::Database(format!("set dependencies failed: {e}")))?;
560
561            Ok(())
562        })();
563
564        match result {
565            Ok(()) => {
566                conn.execute_batch("RELEASE SAVEPOINT set_deps")
567                    .map_err(|e| flow_core::FlowError::Database(format!("release failed: {e}")))?;
568                Ok(())
569            }
570            Err(e) => {
571                let _ = conn.execute_batch("ROLLBACK TO SAVEPOINT set_deps");
572                Err(e)
573            }
574        }
575    }
576
577    /// Get the complete dependency graph with dependents computed.
578    pub fn get_graph(conn: &Connection) -> Result<Vec<FeatureGraphNode>> {
579        let features = Self::get_all(conn)?;
580        let passing_ids: HashSet<i64> =
581            features.iter().filter(|f| f.passes).map(|f| f.id).collect();
582
583        // Build reverse dependency map (dependents)
584        let mut dependents_map: HashMap<i64, Vec<i64>> = HashMap::new();
585        for feature in &features {
586            for dep_id in &feature.dependencies {
587                dependents_map.entry(*dep_id).or_default().push(feature.id);
588            }
589        }
590
591        let graph: Vec<FeatureGraphNode> = features
592            .into_iter()
593            .map(|f| {
594                let blocked = !f.dependencies.is_empty()
595                    && f.dependencies
596                        .iter()
597                        .any(|dep_id| !passing_ids.contains(dep_id));
598
599                FeatureGraphNode {
600                    id: f.id,
601                    name: f.name,
602                    category: f.category,
603                    priority: f.priority,
604                    passes: f.passes,
605                    in_progress: f.in_progress,
606                    blocked,
607                    dependencies: f.dependencies,
608                    dependents: dependents_map.get(&f.id).cloned().unwrap_or_default(),
609                }
610            })
611            .collect();
612
613        Ok(graph)
614    }
615}
616
617/// Helper to convert a database row to a Feature.
618fn row_to_feature(row: &Row) -> rusqlite::Result<Feature> {
619    let steps_json: String = row.get("steps")?;
620    let steps: Vec<String> = serde_json::from_str(&steps_json).map_err(|e| {
621        rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
622    })?;
623
624    let deps_json: String = row.get("dependencies")?;
625    let dependencies: Vec<i64> = serde_json::from_str(&deps_json).map_err(|e| {
626        rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
627    })?;
628
629    let passes_int: i32 = row.get("passes")?;
630    let in_progress_int: i32 = row.get("in_progress")?;
631
632    Ok(Feature {
633        id: row.get("id")?,
634        priority: row.get("priority")?,
635        category: row.get("category")?,
636        name: row.get("name")?,
637        description: row.get("description")?,
638        steps,
639        passes: passes_int != 0,
640        in_progress: in_progress_int != 0,
641        dependencies,
642        created_at: row.get("created_at")?,
643        updated_at: row.get("updated_at")?,
644    })
645}
646
647#[cfg(test)]
648#[allow(clippy::significant_drop_tightening)]
649mod tests {
650    use super::*;
651    use crate::Database;
652
653    #[test]
654    fn test_create_and_get() {
655        let db = Database::open_in_memory().unwrap();
656        let conn = db.writer().lock().unwrap();
657
658        let feature = FeatureStore::create(
659            &conn,
660            &CreateFeatureInput {
661                name: "Test Feature".to_string(),
662                description: "Test Description".to_string(),
663                priority: Some(1),
664                category: "Test".to_string(),
665                steps: vec!["Step 1".to_string(), "Step 2".to_string()],
666                dependencies: vec![],
667            },
668        )
669        .unwrap();
670
671        assert_eq!(feature.id, 1);
672        assert_eq!(feature.name, "Test Feature");
673        assert_eq!(feature.priority, 1);
674        assert_eq!(feature.steps.len(), 2);
675
676        let retrieved = FeatureStore::get_by_id(&conn, 1).unwrap().unwrap();
677        assert_eq!(retrieved.name, "Test Feature");
678    }
679
680    #[test]
681    fn test_bulk_create_with_index_dependencies() {
682        let db = Database::open_in_memory().unwrap();
683        let conn = db.writer().lock().unwrap();
684
685        let inputs = vec![
686            CreateFeatureInput {
687                name: "Feature A".to_string(),
688                description: String::new(),
689                priority: None,
690                category: String::new(),
691                steps: vec![],
692                dependencies: vec![],
693            },
694            CreateFeatureInput {
695                name: "Feature B".to_string(),
696                description: String::new(),
697                priority: None,
698                category: String::new(),
699                steps: vec![],
700                dependencies: vec![DependencyRef::Index { index: 0 }],
701            },
702            CreateFeatureInput {
703                name: "Feature C".to_string(),
704                description: String::new(),
705                priority: None,
706                category: String::new(),
707                steps: vec![],
708                dependencies: vec![
709                    DependencyRef::Index { index: 0 },
710                    DependencyRef::Index { index: 1 },
711                ],
712            },
713        ];
714
715        let features = FeatureStore::create_bulk(&conn, &inputs).unwrap();
716        assert_eq!(features.len(), 3);
717
718        // Feature B should depend on Feature A
719        assert_eq!(features[1].dependencies, vec![features[0].id]);
720
721        // Feature C should depend on both A and B
722        assert_eq!(
723            features[2].dependencies,
724            vec![features[0].id, features[1].id]
725        );
726    }
727
728    #[test]
729    fn test_claim_and_get() {
730        let db = Database::open_in_memory().unwrap();
731        let conn = db.writer().lock().unwrap();
732
733        let feature = FeatureStore::create(
734            &conn,
735            &CreateFeatureInput {
736                name: "Claimable".to_string(),
737                description: String::new(),
738                priority: Some(1),
739                category: String::new(),
740                steps: vec![],
741                dependencies: vec![],
742            },
743        )
744        .unwrap();
745
746        // First claim should succeed
747        let claimed = FeatureStore::claim_and_get(&conn, feature.id).unwrap();
748        assert!(claimed.in_progress);
749
750        // Second claim should fail
751        let result = FeatureStore::claim_and_get(&conn, feature.id);
752        assert!(result.is_err());
753        assert!(matches!(
754            result.unwrap_err(),
755            flow_core::FlowError::Conflict(_)
756        ));
757    }
758
759    #[test]
760    fn test_mark_passing_failing() {
761        let db = Database::open_in_memory().unwrap();
762        let conn = db.writer().lock().unwrap();
763
764        let feature = FeatureStore::create(
765            &conn,
766            &CreateFeatureInput {
767                name: "Test".to_string(),
768                description: String::new(),
769                priority: Some(1),
770                category: String::new(),
771                steps: vec![],
772                dependencies: vec![],
773            },
774        )
775        .unwrap();
776
777        // Mark passing
778        FeatureStore::mark_passing(&conn, feature.id).unwrap();
779        let updated = FeatureStore::get_by_id(&conn, feature.id).unwrap().unwrap();
780        assert!(updated.passes);
781        assert!(!updated.in_progress);
782
783        // Mark failing
784        FeatureStore::mark_failing(&conn, feature.id).unwrap();
785        let updated = FeatureStore::get_by_id(&conn, feature.id).unwrap().unwrap();
786        assert!(!updated.passes);
787        assert!(!updated.in_progress);
788    }
789
790    #[test]
791    fn test_dependencies() {
792        let db = Database::open_in_memory().unwrap();
793        let conn = db.writer().lock().unwrap();
794
795        let f1 = FeatureStore::create(
796            &conn,
797            &CreateFeatureInput {
798                name: "F1".to_string(),
799                description: String::new(),
800                priority: Some(1),
801                category: String::new(),
802                steps: vec![],
803                dependencies: vec![],
804            },
805        )
806        .unwrap();
807
808        let f2 = FeatureStore::create(
809            &conn,
810            &CreateFeatureInput {
811                name: "F2".to_string(),
812                description: String::new(),
813                priority: Some(2),
814                category: String::new(),
815                steps: vec![],
816                dependencies: vec![],
817            },
818        )
819        .unwrap();
820
821        // Add dependency
822        FeatureStore::add_dependency(&conn, f2.id, f1.id).unwrap();
823        let updated = FeatureStore::get_by_id(&conn, f2.id).unwrap().unwrap();
824        assert_eq!(updated.dependencies, vec![f1.id]);
825
826        // Remove dependency
827        FeatureStore::remove_dependency(&conn, f2.id, f1.id).unwrap();
828        let updated = FeatureStore::get_by_id(&conn, f2.id).unwrap().unwrap();
829        assert!(updated.dependencies.is_empty());
830
831        // Set dependencies
832        FeatureStore::set_dependencies(&conn, f2.id, &[f1.id]).unwrap();
833        let updated = FeatureStore::get_by_id(&conn, f2.id).unwrap().unwrap();
834        assert_eq!(updated.dependencies, vec![f1.id]);
835    }
836
837    #[test]
838    fn test_get_ready_and_blocked() {
839        let db = Database::open_in_memory().unwrap();
840        let conn = db.writer().lock().unwrap();
841
842        let f1 = FeatureStore::create(
843            &conn,
844            &CreateFeatureInput {
845                name: "F1".to_string(),
846                description: String::new(),
847                priority: Some(1),
848                category: String::new(),
849                steps: vec![],
850                dependencies: vec![],
851            },
852        )
853        .unwrap();
854
855        let f2 = FeatureStore::create(
856            &conn,
857            &CreateFeatureInput {
858                name: "F2".to_string(),
859                description: String::new(),
860                priority: Some(2),
861                category: String::new(),
862                steps: vec![],
863                dependencies: vec![DependencyRef::Id(f1.id)],
864            },
865        )
866        .unwrap();
867
868        // F1 should be ready (no deps), F2 should be blocked (F1 not passing)
869        let ready = FeatureStore::get_ready(&conn).unwrap();
870        assert_eq!(ready.len(), 1);
871        assert_eq!(ready[0].id, f1.id);
872
873        let blocked = FeatureStore::get_blocked(&conn).unwrap();
874        assert_eq!(blocked.len(), 1);
875        assert_eq!(blocked[0].id, f2.id);
876
877        // Mark F1 as passing
878        FeatureStore::mark_passing(&conn, f1.id).unwrap();
879
880        // Now F2 should be ready
881        let ready = FeatureStore::get_ready(&conn).unwrap();
882        assert_eq!(ready.len(), 1);
883        assert_eq!(ready[0].id, f2.id);
884
885        let blocked = FeatureStore::get_blocked(&conn).unwrap();
886        assert_eq!(blocked.len(), 0);
887    }
888
889    #[test]
890    fn test_get_stats() {
891        let db = Database::open_in_memory().unwrap();
892        let conn = db.writer().lock().unwrap();
893
894        let f1 = FeatureStore::create(
895            &conn,
896            &CreateFeatureInput {
897                name: "F1".to_string(),
898                description: String::new(),
899                priority: Some(1),
900                category: String::new(),
901                steps: vec![],
902                dependencies: vec![],
903            },
904        )
905        .unwrap();
906
907        let f2 = FeatureStore::create(
908            &conn,
909            &CreateFeatureInput {
910                name: "F2".to_string(),
911                description: String::new(),
912                priority: Some(2),
913                category: String::new(),
914                steps: vec![],
915                dependencies: vec![],
916            },
917        )
918        .unwrap();
919
920        FeatureStore::mark_passing(&conn, f1.id).unwrap();
921        FeatureStore::mark_in_progress(&conn, f2.id).unwrap();
922
923        let stats = FeatureStore::get_stats(&conn).unwrap();
924        assert_eq!(stats.total, 2);
925        assert_eq!(stats.passing, 1);
926        assert_eq!(stats.in_progress, 1);
927        assert_eq!(stats.failing, 0);
928    }
929
930    #[test]
931    fn test_get_graph() {
932        let db = Database::open_in_memory().unwrap();
933        let conn = db.writer().lock().unwrap();
934
935        let f1 = FeatureStore::create(
936            &conn,
937            &CreateFeatureInput {
938                name: "F1".to_string(),
939                description: String::new(),
940                priority: Some(1),
941                category: "Cat1".to_string(),
942                steps: vec![],
943                dependencies: vec![],
944            },
945        )
946        .unwrap();
947
948        let f2 = FeatureStore::create(
949            &conn,
950            &CreateFeatureInput {
951                name: "F2".to_string(),
952                description: String::new(),
953                priority: Some(2),
954                category: "Cat2".to_string(),
955                steps: vec![],
956                dependencies: vec![DependencyRef::Id(f1.id)],
957            },
958        )
959        .unwrap();
960
961        let graph = FeatureStore::get_graph(&conn).unwrap();
962        assert_eq!(graph.len(), 2);
963
964        // F1 should have F2 as a dependent
965        let f1_node = graph.iter().find(|n| n.id == f1.id).unwrap();
966        assert_eq!(f1_node.dependents, vec![f2.id]);
967
968        // F2 should be blocked (F1 not passing)
969        let f2_node = graph.iter().find(|n| n.id == f2.id).unwrap();
970        assert!(f2_node.blocked);
971        assert_eq!(f2_node.dependencies, vec![f1.id]);
972    }
973}