Skip to main content

dk_engine/
changeset.rs

1use chrono::{DateTime, Utc};
2use sqlx::PgPool;
3use uuid::Uuid;
4
5use dk_core::{RepoId, SymbolId};
6
7/// Explicit changeset states. Replaces the former ambiguous "open" state.
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum ChangesetState {
10    Draft,
11    Submitted,
12    Verifying,
13    Approved,
14    Rejected,
15    Merged,
16    Closed,
17}
18
19impl ChangesetState {
20    /// Parse a state string from the database into a `ChangesetState`.
21    pub fn parse(s: &str) -> Option<Self> {
22        match s {
23            "draft" => Some(Self::Draft),
24            "submitted" => Some(Self::Submitted),
25            "verifying" => Some(Self::Verifying),
26            "approved" => Some(Self::Approved),
27            "rejected" => Some(Self::Rejected),
28            "merged" => Some(Self::Merged),
29            "closed" => Some(Self::Closed),
30            _ => None,
31        }
32    }
33
34    /// Return the database string representation.
35    pub fn as_str(&self) -> &'static str {
36        match self {
37            Self::Draft => "draft",
38            Self::Submitted => "submitted",
39            Self::Verifying => "verifying",
40            Self::Approved => "approved",
41            Self::Rejected => "rejected",
42            Self::Merged => "merged",
43            Self::Closed => "closed",
44        }
45    }
46
47    /// Check whether transitioning from `self` to `target` is valid.
48    ///
49    /// Valid transitions:
50    /// - draft      -> submitted
51    /// - submitted  -> verifying
52    /// - verifying  -> approved | rejected
53    /// - approved   -> merged
54    /// - any        -> closed
55    pub fn can_transition_to(&self, target: Self) -> bool {
56        if target == Self::Closed {
57            return true;
58        }
59        matches!(
60            (self, target),
61            (Self::Draft, Self::Submitted)
62                | (Self::Submitted, Self::Verifying)
63                | (Self::Verifying, Self::Approved)
64                | (Self::Verifying, Self::Rejected)
65                | (Self::Approved, Self::Merged)
66        )
67    }
68}
69
70impl std::fmt::Display for ChangesetState {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.write_str(self.as_str())
73    }
74}
75
76#[derive(Debug, Clone, sqlx::FromRow)]
77pub struct Changeset {
78    pub id: Uuid,
79    pub repo_id: RepoId,
80    pub number: i32,
81    pub title: String,
82    pub intent_summary: Option<String>,
83    pub source_branch: String,
84    pub target_branch: String,
85    pub state: String,
86    pub reason: String,
87    pub session_id: Option<Uuid>,
88    pub agent_id: Option<String>,
89    pub agent_name: Option<String>,
90    pub author_id: Option<Uuid>,
91    pub base_version: Option<String>,
92    pub merged_version: Option<String>,
93    pub created_at: DateTime<Utc>,
94    pub updated_at: DateTime<Utc>,
95    pub merged_at: Option<DateTime<Utc>>,
96}
97
98impl Changeset {
99    /// Parse the current state string into a typed `ChangesetState`.
100    pub fn parsed_state(&self) -> Option<ChangesetState> {
101        ChangesetState::parse(&self.state)
102    }
103
104    /// Validate and perform a state transition, recording the reason.
105    /// Returns an error if the transition is not allowed.
106    pub fn transition(
107        &mut self,
108        target: ChangesetState,
109        reason: impl Into<String>,
110    ) -> dk_core::Result<()> {
111        let current = self.parsed_state().ok_or_else(|| {
112            dk_core::Error::Internal(format!("unknown current state: '{}'", self.state))
113        })?;
114
115        if !current.can_transition_to(target) {
116            return Err(dk_core::Error::InvalidInput(format!(
117                "invalid state transition: '{}' -> '{}'",
118                current, target,
119            )));
120        }
121
122        self.state = target.as_str().to_string();
123        self.reason = reason.into();
124        Ok(())
125    }
126}
127
128#[derive(Debug, Clone)]
129pub struct ChangesetFile {
130    pub changeset_id: Uuid,
131    pub file_path: String,
132    pub operation: String,
133    pub content: Option<String>,
134}
135
136#[derive(Debug, Clone)]
137pub struct ChangesetFileMeta {
138    pub file_path: String,
139    pub operation: String,
140    pub size_bytes: i64,
141}
142
143pub struct ChangesetStore {
144    db: PgPool,
145}
146
147impl ChangesetStore {
148    pub fn new(db: PgPool) -> Self {
149        Self { db }
150    }
151
152    /// Create a changeset via the Agent Protocol path.
153    /// Auto-increments the number per repo using an advisory lock.
154    /// Sets `source_branch` to `agent/<agent_name>` and `target_branch` to `main`
155    /// so platform queries that read these NOT NULL columns always succeed.
156    /// `agent_name` is the human-readable name (e.g. "agent-1" or "feature-bot").
157    pub async fn create(
158        &self,
159        repo_id: RepoId,
160        session_id: Option<Uuid>,
161        agent_id: &str,
162        intent: &str,
163        base_version: Option<&str>,
164        agent_name: &str,
165    ) -> dk_core::Result<Changeset> {
166        let intent_slug: String = intent
167            .to_lowercase()
168            .chars()
169            .map(|c| if c.is_alphanumeric() || c == '-' { c } else { '-' })
170            .collect::<String>()
171            .trim_matches('-')
172            .to_string();
173        let slug = if intent_slug.len() > 50 {
174            let cut = intent_slug
175                .char_indices()
176                .take_while(|(i, _)| *i < 50)
177                .last()
178                .map(|(i, c)| i + c.len_utf8())
179                .unwrap_or(0);
180            intent_slug[..cut].trim_end_matches('-').to_string()
181        } else {
182            intent_slug
183        };
184        let source_branch = format!("{}/{}", slug, agent_name);
185        let target_branch = "main";
186
187        let mut tx = self.db.begin().await?;
188
189        sqlx::query("SELECT pg_advisory_xact_lock(hashtext('changeset:' || $1::text))")
190            .bind(repo_id)
191            .execute(&mut *tx)
192            .await?;
193
194        let row: (Uuid, i32, String, String, DateTime<Utc>, DateTime<Utc>) = sqlx::query_as(
195            r#"INSERT INTO changesets
196                   (repo_id, number, title, intent_summary, source_branch, target_branch,
197                    state, reason, session_id, agent_id, agent_name, base_version)
198               SELECT $1, COALESCE(MAX(number), 0) + 1, $2, $2, $3, $4,
199                    'draft', 'created via agent connect', $5, $6, $7, $8
200               FROM changesets WHERE repo_id = $1
201               RETURNING id, number, state, reason, created_at, updated_at"#,
202        )
203        .bind(repo_id)
204        .bind(intent)
205        .bind(&source_branch)
206        .bind(target_branch)
207        .bind(session_id)
208        .bind(agent_id)
209        .bind(agent_name)
210        .bind(base_version)
211        .fetch_one(&mut *tx)
212        .await?;
213
214        tx.commit().await?;
215
216        Ok(Changeset {
217            id: row.0,
218            repo_id,
219            number: row.1,
220            title: intent.to_string(),
221            intent_summary: Some(intent.to_string()),
222            source_branch,
223            target_branch: target_branch.to_string(),
224            state: row.2,
225            reason: row.3,
226            session_id,
227            agent_id: Some(agent_id.to_string()),
228            agent_name: Some(agent_name.to_string()),
229            author_id: None,
230            base_version: base_version.map(String::from),
231            merged_version: None,
232            created_at: row.4,
233            updated_at: row.5,
234            merged_at: None,
235        })
236    }
237
238    pub async fn get(&self, id: Uuid) -> dk_core::Result<Changeset> {
239        sqlx::query_as::<_, Changeset>(
240            r#"SELECT id, repo_id, number, title, intent_summary,
241                      source_branch, target_branch, state, reason,
242                      session_id, agent_id, agent_name, author_id,
243                      base_version, merged_version,
244                      created_at, updated_at, merged_at
245               FROM changesets WHERE id = $1"#,
246        )
247        .bind(id)
248        .fetch_optional(&self.db)
249        .await?
250        .ok_or_else(|| dk_core::Error::Internal(format!("changeset {} not found", id)))
251    }
252
253    pub async fn update_status(&self, id: Uuid, status: &str) -> dk_core::Result<()> {
254        self.update_status_with_reason(id, status, "").await
255    }
256
257    /// Update changeset status and record the reason for the transition.
258    pub async fn update_status_with_reason(
259        &self,
260        id: Uuid,
261        status: &str,
262        reason: &str,
263    ) -> dk_core::Result<()> {
264        sqlx::query(
265            "UPDATE changesets SET state = $1, reason = $2, updated_at = now() WHERE id = $3",
266        )
267        .bind(status)
268        .bind(reason)
269        .bind(id)
270        .execute(&self.db)
271        .await?;
272        Ok(())
273    }
274
275    /// Update changeset status with optimistic locking: the transition only
276    /// succeeds when the current state matches one of `expected_states`.
277    /// Returns an error if the row was not updated (state mismatch or not found).
278    pub async fn update_status_if(
279        &self,
280        id: Uuid,
281        new_status: &str,
282        expected_states: &[&str],
283    ) -> dk_core::Result<()> {
284        self.update_status_if_with_reason(id, new_status, expected_states, "").await
285    }
286
287    /// Like `update_status_if` but also records a reason for the transition.
288    pub async fn update_status_if_with_reason(
289        &self,
290        id: Uuid,
291        new_status: &str,
292        expected_states: &[&str],
293        reason: &str,
294    ) -> dk_core::Result<()> {
295        let states: Vec<String> = expected_states.iter().map(|s| s.to_string()).collect();
296        let result = sqlx::query(
297            "UPDATE changesets SET state = $1, reason = $2, updated_at = now() WHERE id = $3 AND state = ANY($4)",
298        )
299        .bind(new_status)
300        .bind(reason)
301        .bind(id)
302        .bind(&states)
303        .execute(&self.db)
304        .await?;
305
306        if result.rows_affected() == 0 {
307            return Err(dk_core::Error::Internal(format!(
308                "changeset {} not found or not in expected state (expected one of: {:?})",
309                id, expected_states,
310            )));
311        }
312        Ok(())
313    }
314
315    pub async fn set_merged(&self, id: Uuid, commit_hash: &str) -> dk_core::Result<()> {
316        sqlx::query(
317            "UPDATE changesets SET state = 'merged', reason = 'merge completed', merged_version = $1, merged_at = now(), updated_at = now() WHERE id = $2",
318        )
319        .bind(commit_hash)
320        .bind(id)
321        .execute(&self.db)
322        .await?;
323        Ok(())
324    }
325
326    pub async fn upsert_file(
327        &self,
328        changeset_id: Uuid,
329        file_path: &str,
330        operation: &str,
331        content: Option<&str>,
332    ) -> dk_core::Result<()> {
333        sqlx::query(
334            r#"INSERT INTO changeset_files (changeset_id, file_path, operation, content)
335               VALUES ($1, $2, $3, $4)
336               ON CONFLICT (changeset_id, file_path) DO UPDATE SET
337                   operation = EXCLUDED.operation,
338                   content = EXCLUDED.content"#,
339        )
340        .bind(changeset_id)
341        .bind(file_path)
342        .bind(operation)
343        .bind(content)
344        .execute(&self.db)
345        .await?;
346        Ok(())
347    }
348
349    pub async fn get_files(&self, changeset_id: Uuid) -> dk_core::Result<Vec<ChangesetFile>> {
350        let rows: Vec<(Uuid, String, String, Option<String>)> = sqlx::query_as(
351            "SELECT changeset_id, file_path, operation, content FROM changeset_files WHERE changeset_id = $1",
352        )
353        .bind(changeset_id)
354        .fetch_all(&self.db)
355        .await?;
356
357        Ok(rows
358            .into_iter()
359            .map(|r| ChangesetFile {
360                changeset_id: r.0,
361                file_path: r.1,
362                operation: r.2,
363                content: r.3,
364            })
365            .collect())
366    }
367
368    /// Lightweight query returning only file metadata (path, operation, size)
369    /// without loading the full content column.
370    pub async fn get_files_metadata(&self, changeset_id: Uuid) -> dk_core::Result<Vec<ChangesetFileMeta>> {
371        let rows: Vec<(String, String, i64)> = sqlx::query_as(
372            "SELECT file_path, operation, COALESCE(LENGTH(content), 0)::bigint AS size_bytes FROM changeset_files WHERE changeset_id = $1",
373        )
374        .bind(changeset_id)
375        .fetch_all(&self.db)
376        .await?;
377
378        Ok(rows
379            .into_iter()
380            .map(|r| ChangesetFileMeta {
381                file_path: r.0,
382                operation: r.1,
383                size_bytes: r.2,
384            })
385            .collect())
386    }
387
388    pub async fn record_affected_symbol(
389        &self,
390        changeset_id: Uuid,
391        symbol_id: SymbolId,
392        qualified_name: &str,
393    ) -> dk_core::Result<()> {
394        sqlx::query(
395            r#"INSERT INTO changeset_symbols (changeset_id, symbol_id, symbol_qualified_name)
396               VALUES ($1, $2, $3)
397               ON CONFLICT DO NOTHING"#,
398        )
399        .bind(changeset_id)
400        .bind(symbol_id)
401        .bind(qualified_name)
402        .execute(&self.db)
403        .await?;
404        Ok(())
405    }
406
407    pub async fn get_affected_symbols(&self, changeset_id: Uuid) -> dk_core::Result<Vec<(SymbolId, String)>> {
408        let rows: Vec<(Uuid, String)> = sqlx::query_as(
409            "SELECT symbol_id, symbol_qualified_name FROM changeset_symbols WHERE changeset_id = $1",
410        )
411        .bind(changeset_id)
412        .fetch_all(&self.db)
413        .await?;
414        Ok(rows)
415    }
416
417    /// Find changesets that conflict with ours.
418    /// Only considers changesets merged AFTER our base_version —
419    /// i.e. changes the agent didn't know about when it started.
420    pub async fn find_conflicting_changesets(
421        &self,
422        repo_id: RepoId,
423        base_version: &str,
424        my_changeset_id: Uuid,
425    ) -> dk_core::Result<Vec<(Uuid, Vec<String>)>> {
426        let rows: Vec<(Uuid, String)> = sqlx::query_as(
427            r#"SELECT DISTINCT cs.changeset_id, cs.symbol_qualified_name
428               FROM changeset_symbols cs
429               JOIN changesets c ON c.id = cs.changeset_id
430               WHERE c.repo_id = $1
431                 AND c.state = 'merged'
432                 AND c.id != $2
433                 AND c.merged_version IS NOT NULL
434                 AND c.merged_version != $3
435                 AND cs.symbol_qualified_name IN (
436                     SELECT symbol_qualified_name FROM changeset_symbols WHERE changeset_id = $2
437                 )"#,
438        )
439        .bind(repo_id)
440        .bind(my_changeset_id)
441        .bind(base_version)
442        .fetch_all(&self.db)
443        .await?;
444
445        let mut map: std::collections::HashMap<Uuid, Vec<String>> = std::collections::HashMap::new();
446        for (cs_id, sym_name) in rows {
447            map.entry(cs_id).or_default().push(sym_name);
448        }
449        Ok(map.into_iter().collect())
450    }
451}
452
453#[cfg(test)]
454mod tests {
455    use super::*;
456
457    /// Verify the source_branch format produced by `create()`.
458    /// Branch format: `{intent_slug}/{agent_name}`.
459    fn slugify_intent(intent: &str) -> String {
460        let slug: String = intent
461            .to_lowercase()
462            .chars()
463            .map(|c| if c.is_alphanumeric() || c == '-' { c } else { '-' })
464            .collect::<String>()
465            .trim_matches('-')
466            .to_string();
467        if slug.len() > 50 {
468            slug[..50].trim_end_matches('-').to_string()
469        } else {
470            slug
471        }
472    }
473
474    #[test]
475    fn source_branch_format_uses_intent_slug() {
476        let intent = "Fix UI bugs";
477        let agent_name = "agent-1";
478        let source_branch = format!("{}/{}", slugify_intent(intent), agent_name);
479        assert_eq!(source_branch, "fix-ui-bugs/agent-1");
480    }
481
482    #[test]
483    fn source_branch_format_with_custom_name() {
484        let intent = "Add comments endpoint";
485        let agent_name = "feature-bot";
486        let source_branch = format!("{}/{}", slugify_intent(intent), agent_name);
487        assert_eq!(source_branch, "add-comments-endpoint/feature-bot");
488    }
489
490    #[test]
491    fn target_branch_is_main() {
492        // create() hardcodes target_branch to "main"
493        let target_branch = "main";
494        assert_eq!(target_branch, "main");
495    }
496
497    /// Verify that a manually-constructed Changeset (matching the shape
498    /// returned by `create()`) has the correct branch and agent fields.
499    #[test]
500    fn changeset_create_shape_has_correct_branches() {
501        let repo_id = Uuid::new_v4();
502        let session_id = Uuid::new_v4();
503        let agent_id = "test-agent";
504        let intent = "fix all the bugs";
505
506        let source_branch = format!("agent/{}", agent_id);
507        let now = Utc::now();
508
509        let cs = Changeset {
510            id: Uuid::new_v4(),
511            repo_id,
512            number: 1,
513            title: intent.to_string(),
514            intent_summary: Some(intent.to_string()),
515            source_branch: source_branch.clone(),
516            target_branch: "main".to_string(),
517            state: "draft".to_string(),
518            reason: String::new(),
519            session_id: Some(session_id),
520            agent_id: Some(agent_id.to_string()),
521            agent_name: Some(agent_id.to_string()),
522            author_id: None,
523            base_version: Some("abc123".to_string()),
524            merged_version: None,
525            created_at: now,
526            updated_at: now,
527            merged_at: None,
528        };
529
530        assert_eq!(cs.source_branch, "agent/test-agent");
531        assert_eq!(cs.target_branch, "main");
532        assert_eq!(cs.agent_name.as_deref(), Some("test-agent"));
533        assert_eq!(cs.agent_id, cs.agent_name, "agent_name should equal agent_id per create()");
534        assert!(cs.merged_at.is_none());
535        assert!(cs.merged_version.is_none());
536    }
537
538    /// Verify the Changeset struct fields are all accessible and have
539    /// the expected types (compile-time check + runtime assertions).
540    #[test]
541    fn changeset_all_fields_accessible() {
542        let now = Utc::now();
543        let id = Uuid::new_v4();
544        let repo_id = Uuid::new_v4();
545
546        let cs = Changeset {
547            id,
548            repo_id,
549            number: 42,
550            title: "test".to_string(),
551            intent_summary: None,
552            source_branch: "agent/a".to_string(),
553            target_branch: "main".to_string(),
554            state: "draft".to_string(),
555            reason: String::new(),
556            session_id: None,
557            agent_id: None,
558            agent_name: None,
559            author_id: None,
560            base_version: None,
561            merged_version: None,
562            created_at: now,
563            updated_at: now,
564            merged_at: None,
565        };
566
567        assert_eq!(cs.id, id);
568        assert_eq!(cs.repo_id, repo_id);
569        assert_eq!(cs.number, 42);
570        assert_eq!(cs.title, "test");
571        assert!(cs.intent_summary.is_none());
572        assert!(cs.session_id.is_none());
573        assert!(cs.agent_id.is_none());
574        assert!(cs.agent_name.is_none());
575        assert!(cs.author_id.is_none());
576        assert!(cs.base_version.is_none());
577        assert!(cs.merged_version.is_none());
578        assert!(cs.merged_at.is_none());
579    }
580
581    #[test]
582    fn changeset_file_meta_struct() {
583        let meta = ChangesetFileMeta {
584            file_path: "src/main.rs".to_string(),
585            operation: "modify".to_string(),
586            size_bytes: 1024,
587        };
588        assert_eq!(meta.file_path, "src/main.rs");
589        assert_eq!(meta.operation, "modify");
590        assert_eq!(meta.size_bytes, 1024);
591    }
592
593    #[test]
594    fn changeset_file_struct() {
595        let cf = ChangesetFile {
596            changeset_id: Uuid::new_v4(),
597            file_path: "lib.rs".to_string(),
598            operation: "add".to_string(),
599            content: Some("fn main() {}".to_string()),
600        };
601        assert_eq!(cf.file_path, "lib.rs");
602        assert_eq!(cf.operation, "add");
603        assert!(cf.content.is_some());
604    }
605
606    #[test]
607    fn changeset_clone_produces_equal_values() {
608        let now = Utc::now();
609        let cs = Changeset {
610            id: Uuid::new_v4(),
611            repo_id: Uuid::new_v4(),
612            number: 1,
613            title: "clone test".to_string(),
614            intent_summary: Some("intent".to_string()),
615            source_branch: "agent/x".to_string(),
616            target_branch: "main".to_string(),
617            state: "draft".to_string(),
618            reason: String::new(),
619            session_id: None,
620            agent_id: Some("x".to_string()),
621            agent_name: Some("x".to_string()),
622            author_id: None,
623            base_version: None,
624            merged_version: None,
625            created_at: now,
626            updated_at: now,
627            merged_at: None,
628        };
629
630        let cloned = cs.clone();
631        assert_eq!(cs.id, cloned.id);
632        assert_eq!(cs.source_branch, cloned.source_branch);
633        assert_eq!(cs.target_branch, cloned.target_branch);
634        assert_eq!(cs.state, cloned.state);
635    }
636}