Skip to main content

dk_engine/
changeset.rs

1use chrono::{DateTime, Utc};
2use sqlx::PgPool;
3use uuid::Uuid;
4
5use dk_core::{RepoId, SymbolId};
6
7/// Explicit changeset states. Replaces the former ambiguous "open" state.
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum ChangesetState {
10    Draft,
11    Submitted,
12    Verifying,
13    Approved,
14    Rejected,
15    Merged,
16    Closed,
17}
18
19impl ChangesetState {
20    /// Parse a state string from the database into a `ChangesetState`.
21    pub fn parse(s: &str) -> Option<Self> {
22        match s {
23            "draft" => Some(Self::Draft),
24            "submitted" => Some(Self::Submitted),
25            "verifying" => Some(Self::Verifying),
26            "approved" => Some(Self::Approved),
27            "rejected" => Some(Self::Rejected),
28            "merged" => Some(Self::Merged),
29            "closed" => Some(Self::Closed),
30            _ => None,
31        }
32    }
33
34    /// Return the database string representation.
35    pub fn as_str(&self) -> &'static str {
36        match self {
37            Self::Draft => "draft",
38            Self::Submitted => "submitted",
39            Self::Verifying => "verifying",
40            Self::Approved => "approved",
41            Self::Rejected => "rejected",
42            Self::Merged => "merged",
43            Self::Closed => "closed",
44        }
45    }
46
47    /// Check whether transitioning from `self` to `target` is valid.
48    ///
49    /// Valid transitions:
50    /// - draft      -> submitted
51    /// - submitted  -> verifying
52    /// - verifying  -> approved | rejected
53    /// - approved   -> merged
54    /// - any        -> closed
55    pub fn can_transition_to(&self, target: Self) -> bool {
56        if target == Self::Closed {
57            return true;
58        }
59        matches!(
60            (self, target),
61            (Self::Draft, Self::Submitted)
62                | (Self::Submitted, Self::Verifying)
63                | (Self::Verifying, Self::Approved)
64                | (Self::Verifying, Self::Rejected)
65                | (Self::Approved, Self::Merged)
66        )
67    }
68}
69
70impl std::fmt::Display for ChangesetState {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.write_str(self.as_str())
73    }
74}
75
76#[derive(Debug, Clone, sqlx::FromRow)]
77pub struct Changeset {
78    pub id: Uuid,
79    pub repo_id: RepoId,
80    pub number: i32,
81    pub title: String,
82    pub intent_summary: Option<String>,
83    pub source_branch: String,
84    pub target_branch: String,
85    pub state: String,
86    pub reason: String,
87    pub session_id: Option<Uuid>,
88    pub agent_id: Option<String>,
89    pub agent_name: Option<String>,
90    pub author_id: Option<Uuid>,
91    pub base_version: Option<String>,
92    pub merged_version: Option<String>,
93    pub created_at: DateTime<Utc>,
94    pub updated_at: DateTime<Utc>,
95    pub merged_at: Option<DateTime<Utc>>,
96}
97
98impl Changeset {
99    /// Parse the current state string into a typed `ChangesetState`.
100    pub fn parsed_state(&self) -> Option<ChangesetState> {
101        ChangesetState::parse(&self.state)
102    }
103
104    /// Validate and perform a state transition, recording the reason.
105    /// Returns an error if the transition is not allowed.
106    pub fn transition(
107        &mut self,
108        target: ChangesetState,
109        reason: impl Into<String>,
110    ) -> dk_core::Result<()> {
111        let current = self.parsed_state().ok_or_else(|| {
112            dk_core::Error::Internal(format!("unknown current state: '{}'", self.state))
113        })?;
114
115        if !current.can_transition_to(target) {
116            return Err(dk_core::Error::InvalidInput(format!(
117                "invalid state transition: '{}' -> '{}'",
118                current, target,
119            )));
120        }
121
122        self.state = target.as_str().to_string();
123        self.reason = reason.into();
124        Ok(())
125    }
126}
127
128#[derive(Debug, Clone)]
129pub struct ChangesetFile {
130    pub changeset_id: Uuid,
131    pub file_path: String,
132    pub operation: String,
133    pub content: Option<String>,
134}
135
136#[derive(Debug, Clone)]
137pub struct ChangesetFileMeta {
138    pub file_path: String,
139    pub operation: String,
140    pub size_bytes: i64,
141}
142
143pub struct ChangesetStore {
144    db: PgPool,
145}
146
147impl ChangesetStore {
148    pub fn new(db: PgPool) -> Self {
149        Self { db }
150    }
151
152    /// Create a changeset via the Agent Protocol path.
153    /// Auto-increments the number per repo using an advisory lock.
154    /// Sets `source_branch` to `agent/<agent_name>` and `target_branch` to `main`
155    /// so platform queries that read these NOT NULL columns always succeed.
156    /// `agent_name` is the human-readable name (e.g. "agent-1" or "feature-bot").
157    pub async fn create(
158        &self,
159        repo_id: RepoId,
160        session_id: Option<Uuid>,
161        agent_id: &str,
162        intent: &str,
163        base_version: Option<&str>,
164        agent_name: &str,
165    ) -> dk_core::Result<Changeset> {
166        let intent_slug: String = intent
167            .to_lowercase()
168            .chars()
169            .map(|c| if c.is_alphanumeric() || c == '-' { c } else { '-' })
170            .collect::<String>()
171            .trim_matches('-')
172            .to_string();
173        let slug = if intent_slug.len() > 50 {
174            intent_slug[..50].trim_end_matches('-').to_string()
175        } else {
176            intent_slug
177        };
178        let source_branch = format!("{}/{}", slug, agent_name);
179        let target_branch = "main";
180
181        let mut tx = self.db.begin().await?;
182
183        sqlx::query("SELECT pg_advisory_xact_lock(hashtext('changeset:' || $1::text))")
184            .bind(repo_id)
185            .execute(&mut *tx)
186            .await?;
187
188        let row: (Uuid, i32, String, String, DateTime<Utc>, DateTime<Utc>) = sqlx::query_as(
189            r#"INSERT INTO changesets
190                   (repo_id, number, title, intent_summary, source_branch, target_branch,
191                    state, reason, session_id, agent_id, agent_name, base_version)
192               SELECT $1, COALESCE(MAX(number), 0) + 1, $2, $2, $3, $4,
193                    'draft', 'created via agent connect', $5, $6, $7, $8
194               FROM changesets WHERE repo_id = $1
195               RETURNING id, number, state, reason, created_at, updated_at"#,
196        )
197        .bind(repo_id)
198        .bind(intent)
199        .bind(&source_branch)
200        .bind(target_branch)
201        .bind(session_id)
202        .bind(agent_id)
203        .bind(agent_name)
204        .bind(base_version)
205        .fetch_one(&mut *tx)
206        .await?;
207
208        tx.commit().await?;
209
210        Ok(Changeset {
211            id: row.0,
212            repo_id,
213            number: row.1,
214            title: intent.to_string(),
215            intent_summary: Some(intent.to_string()),
216            source_branch,
217            target_branch: target_branch.to_string(),
218            state: row.2,
219            reason: row.3,
220            session_id,
221            agent_id: Some(agent_id.to_string()),
222            agent_name: Some(agent_name.to_string()),
223            author_id: None,
224            base_version: base_version.map(String::from),
225            merged_version: None,
226            created_at: row.4,
227            updated_at: row.5,
228            merged_at: None,
229        })
230    }
231
232    pub async fn get(&self, id: Uuid) -> dk_core::Result<Changeset> {
233        sqlx::query_as::<_, Changeset>(
234            r#"SELECT id, repo_id, number, title, intent_summary,
235                      source_branch, target_branch, state, reason,
236                      session_id, agent_id, agent_name, author_id,
237                      base_version, merged_version,
238                      created_at, updated_at, merged_at
239               FROM changesets WHERE id = $1"#,
240        )
241        .bind(id)
242        .fetch_optional(&self.db)
243        .await?
244        .ok_or_else(|| dk_core::Error::Internal(format!("changeset {} not found", id)))
245    }
246
247    pub async fn update_status(&self, id: Uuid, status: &str) -> dk_core::Result<()> {
248        self.update_status_with_reason(id, status, "").await
249    }
250
251    /// Update changeset status and record the reason for the transition.
252    pub async fn update_status_with_reason(
253        &self,
254        id: Uuid,
255        status: &str,
256        reason: &str,
257    ) -> dk_core::Result<()> {
258        sqlx::query(
259            "UPDATE changesets SET state = $1, reason = $2, updated_at = now() WHERE id = $3",
260        )
261        .bind(status)
262        .bind(reason)
263        .bind(id)
264        .execute(&self.db)
265        .await?;
266        Ok(())
267    }
268
269    /// Update changeset status with optimistic locking: the transition only
270    /// succeeds when the current state matches one of `expected_states`.
271    /// Returns an error if the row was not updated (state mismatch or not found).
272    pub async fn update_status_if(
273        &self,
274        id: Uuid,
275        new_status: &str,
276        expected_states: &[&str],
277    ) -> dk_core::Result<()> {
278        self.update_status_if_with_reason(id, new_status, expected_states, "").await
279    }
280
281    /// Like `update_status_if` but also records a reason for the transition.
282    pub async fn update_status_if_with_reason(
283        &self,
284        id: Uuid,
285        new_status: &str,
286        expected_states: &[&str],
287        reason: &str,
288    ) -> dk_core::Result<()> {
289        let states: Vec<String> = expected_states.iter().map(|s| s.to_string()).collect();
290        let result = sqlx::query(
291            "UPDATE changesets SET state = $1, reason = $2, updated_at = now() WHERE id = $3 AND state = ANY($4)",
292        )
293        .bind(new_status)
294        .bind(reason)
295        .bind(id)
296        .bind(&states)
297        .execute(&self.db)
298        .await?;
299
300        if result.rows_affected() == 0 {
301            return Err(dk_core::Error::Internal(format!(
302                "changeset {} not found or not in expected state (expected one of: {:?})",
303                id, expected_states,
304            )));
305        }
306        Ok(())
307    }
308
309    pub async fn set_merged(&self, id: Uuid, commit_hash: &str) -> dk_core::Result<()> {
310        sqlx::query(
311            "UPDATE changesets SET state = 'merged', reason = 'merge completed', merged_version = $1, merged_at = now(), updated_at = now() WHERE id = $2",
312        )
313        .bind(commit_hash)
314        .bind(id)
315        .execute(&self.db)
316        .await?;
317        Ok(())
318    }
319
320    pub async fn upsert_file(
321        &self,
322        changeset_id: Uuid,
323        file_path: &str,
324        operation: &str,
325        content: Option<&str>,
326    ) -> dk_core::Result<()> {
327        sqlx::query(
328            r#"INSERT INTO changeset_files (changeset_id, file_path, operation, content)
329               VALUES ($1, $2, $3, $4)
330               ON CONFLICT (changeset_id, file_path) DO UPDATE SET
331                   operation = EXCLUDED.operation,
332                   content = EXCLUDED.content"#,
333        )
334        .bind(changeset_id)
335        .bind(file_path)
336        .bind(operation)
337        .bind(content)
338        .execute(&self.db)
339        .await?;
340        Ok(())
341    }
342
343    pub async fn get_files(&self, changeset_id: Uuid) -> dk_core::Result<Vec<ChangesetFile>> {
344        let rows: Vec<(Uuid, String, String, Option<String>)> = sqlx::query_as(
345            "SELECT changeset_id, file_path, operation, content FROM changeset_files WHERE changeset_id = $1",
346        )
347        .bind(changeset_id)
348        .fetch_all(&self.db)
349        .await?;
350
351        Ok(rows
352            .into_iter()
353            .map(|r| ChangesetFile {
354                changeset_id: r.0,
355                file_path: r.1,
356                operation: r.2,
357                content: r.3,
358            })
359            .collect())
360    }
361
362    /// Lightweight query returning only file metadata (path, operation, size)
363    /// without loading the full content column.
364    pub async fn get_files_metadata(&self, changeset_id: Uuid) -> dk_core::Result<Vec<ChangesetFileMeta>> {
365        let rows: Vec<(String, String, i64)> = sqlx::query_as(
366            "SELECT file_path, operation, COALESCE(LENGTH(content), 0)::bigint AS size_bytes FROM changeset_files WHERE changeset_id = $1",
367        )
368        .bind(changeset_id)
369        .fetch_all(&self.db)
370        .await?;
371
372        Ok(rows
373            .into_iter()
374            .map(|r| ChangesetFileMeta {
375                file_path: r.0,
376                operation: r.1,
377                size_bytes: r.2,
378            })
379            .collect())
380    }
381
382    pub async fn record_affected_symbol(
383        &self,
384        changeset_id: Uuid,
385        symbol_id: SymbolId,
386        qualified_name: &str,
387    ) -> dk_core::Result<()> {
388        sqlx::query(
389            r#"INSERT INTO changeset_symbols (changeset_id, symbol_id, symbol_qualified_name)
390               VALUES ($1, $2, $3)
391               ON CONFLICT DO NOTHING"#,
392        )
393        .bind(changeset_id)
394        .bind(symbol_id)
395        .bind(qualified_name)
396        .execute(&self.db)
397        .await?;
398        Ok(())
399    }
400
401    pub async fn get_affected_symbols(&self, changeset_id: Uuid) -> dk_core::Result<Vec<(SymbolId, String)>> {
402        let rows: Vec<(Uuid, String)> = sqlx::query_as(
403            "SELECT symbol_id, symbol_qualified_name FROM changeset_symbols WHERE changeset_id = $1",
404        )
405        .bind(changeset_id)
406        .fetch_all(&self.db)
407        .await?;
408        Ok(rows)
409    }
410
411    /// Find changesets that conflict with ours.
412    /// Only considers changesets merged AFTER our base_version —
413    /// i.e. changes the agent didn't know about when it started.
414    pub async fn find_conflicting_changesets(
415        &self,
416        repo_id: RepoId,
417        base_version: &str,
418        my_changeset_id: Uuid,
419    ) -> dk_core::Result<Vec<(Uuid, Vec<String>)>> {
420        let rows: Vec<(Uuid, String)> = sqlx::query_as(
421            r#"SELECT DISTINCT cs.changeset_id, cs.symbol_qualified_name
422               FROM changeset_symbols cs
423               JOIN changesets c ON c.id = cs.changeset_id
424               WHERE c.repo_id = $1
425                 AND c.state = 'merged'
426                 AND c.id != $2
427                 AND c.merged_version IS NOT NULL
428                 AND c.merged_version != $3
429                 AND cs.symbol_qualified_name IN (
430                     SELECT symbol_qualified_name FROM changeset_symbols WHERE changeset_id = $2
431                 )"#,
432        )
433        .bind(repo_id)
434        .bind(my_changeset_id)
435        .bind(base_version)
436        .fetch_all(&self.db)
437        .await?;
438
439        let mut map: std::collections::HashMap<Uuid, Vec<String>> = std::collections::HashMap::new();
440        for (cs_id, sym_name) in rows {
441            map.entry(cs_id).or_default().push(sym_name);
442        }
443        Ok(map.into_iter().collect())
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use super::*;
450
451    /// Verify the source_branch format produced by `create()`.
452    /// Branch format: `{intent_slug}/{agent_name}`.
453    fn slugify_intent(intent: &str) -> String {
454        let slug: String = intent
455            .to_lowercase()
456            .chars()
457            .map(|c| if c.is_alphanumeric() || c == '-' { c } else { '-' })
458            .collect::<String>()
459            .trim_matches('-')
460            .to_string();
461        if slug.len() > 50 {
462            slug[..50].trim_end_matches('-').to_string()
463        } else {
464            slug
465        }
466    }
467
468    #[test]
469    fn source_branch_format_uses_intent_slug() {
470        let intent = "Fix UI bugs";
471        let agent_name = "agent-1";
472        let source_branch = format!("{}/{}", slugify_intent(intent), agent_name);
473        assert_eq!(source_branch, "fix-ui-bugs/agent-1");
474    }
475
476    #[test]
477    fn source_branch_format_with_custom_name() {
478        let intent = "Add comments endpoint";
479        let agent_name = "feature-bot";
480        let source_branch = format!("{}/{}", slugify_intent(intent), agent_name);
481        assert_eq!(source_branch, "add-comments-endpoint/feature-bot");
482    }
483
484    #[test]
485    fn target_branch_is_main() {
486        // create() hardcodes target_branch to "main"
487        let target_branch = "main";
488        assert_eq!(target_branch, "main");
489    }
490
491    /// Verify that a manually-constructed Changeset (matching the shape
492    /// returned by `create()`) has the correct branch and agent fields.
493    #[test]
494    fn changeset_create_shape_has_correct_branches() {
495        let repo_id = Uuid::new_v4();
496        let session_id = Uuid::new_v4();
497        let agent_id = "test-agent";
498        let intent = "fix all the bugs";
499
500        let source_branch = format!("agent/{}", agent_id);
501        let now = Utc::now();
502
503        let cs = Changeset {
504            id: Uuid::new_v4(),
505            repo_id,
506            number: 1,
507            title: intent.to_string(),
508            intent_summary: Some(intent.to_string()),
509            source_branch: source_branch.clone(),
510            target_branch: "main".to_string(),
511            state: "draft".to_string(),
512            reason: String::new(),
513            session_id: Some(session_id),
514            agent_id: Some(agent_id.to_string()),
515            agent_name: Some(agent_id.to_string()),
516            author_id: None,
517            base_version: Some("abc123".to_string()),
518            merged_version: None,
519            created_at: now,
520            updated_at: now,
521            merged_at: None,
522        };
523
524        assert_eq!(cs.source_branch, "agent/test-agent");
525        assert_eq!(cs.target_branch, "main");
526        assert_eq!(cs.agent_name.as_deref(), Some("test-agent"));
527        assert_eq!(cs.agent_id, cs.agent_name, "agent_name should equal agent_id per create()");
528        assert!(cs.merged_at.is_none());
529        assert!(cs.merged_version.is_none());
530    }
531
532    /// Verify the Changeset struct fields are all accessible and have
533    /// the expected types (compile-time check + runtime assertions).
534    #[test]
535    fn changeset_all_fields_accessible() {
536        let now = Utc::now();
537        let id = Uuid::new_v4();
538        let repo_id = Uuid::new_v4();
539
540        let cs = Changeset {
541            id,
542            repo_id,
543            number: 42,
544            title: "test".to_string(),
545            intent_summary: None,
546            source_branch: "agent/a".to_string(),
547            target_branch: "main".to_string(),
548            state: "draft".to_string(),
549            reason: String::new(),
550            session_id: None,
551            agent_id: None,
552            agent_name: None,
553            author_id: None,
554            base_version: None,
555            merged_version: None,
556            created_at: now,
557            updated_at: now,
558            merged_at: None,
559        };
560
561        assert_eq!(cs.id, id);
562        assert_eq!(cs.repo_id, repo_id);
563        assert_eq!(cs.number, 42);
564        assert_eq!(cs.title, "test");
565        assert!(cs.intent_summary.is_none());
566        assert!(cs.session_id.is_none());
567        assert!(cs.agent_id.is_none());
568        assert!(cs.agent_name.is_none());
569        assert!(cs.author_id.is_none());
570        assert!(cs.base_version.is_none());
571        assert!(cs.merged_version.is_none());
572        assert!(cs.merged_at.is_none());
573    }
574
575    #[test]
576    fn changeset_file_meta_struct() {
577        let meta = ChangesetFileMeta {
578            file_path: "src/main.rs".to_string(),
579            operation: "modify".to_string(),
580            size_bytes: 1024,
581        };
582        assert_eq!(meta.file_path, "src/main.rs");
583        assert_eq!(meta.operation, "modify");
584        assert_eq!(meta.size_bytes, 1024);
585    }
586
587    #[test]
588    fn changeset_file_struct() {
589        let cf = ChangesetFile {
590            changeset_id: Uuid::new_v4(),
591            file_path: "lib.rs".to_string(),
592            operation: "add".to_string(),
593            content: Some("fn main() {}".to_string()),
594        };
595        assert_eq!(cf.file_path, "lib.rs");
596        assert_eq!(cf.operation, "add");
597        assert!(cf.content.is_some());
598    }
599
600    #[test]
601    fn changeset_clone_produces_equal_values() {
602        let now = Utc::now();
603        let cs = Changeset {
604            id: Uuid::new_v4(),
605            repo_id: Uuid::new_v4(),
606            number: 1,
607            title: "clone test".to_string(),
608            intent_summary: Some("intent".to_string()),
609            source_branch: "agent/x".to_string(),
610            target_branch: "main".to_string(),
611            state: "draft".to_string(),
612            reason: String::new(),
613            session_id: None,
614            agent_id: Some("x".to_string()),
615            agent_name: Some("x".to_string()),
616            author_id: None,
617            base_version: None,
618            merged_version: None,
619            created_at: now,
620            updated_at: now,
621            merged_at: None,
622        };
623
624        let cloned = cs.clone();
625        assert_eq!(cs.id, cloned.id);
626        assert_eq!(cs.source_branch, cloned.source_branch);
627        assert_eq!(cs.target_branch, cloned.target_branch);
628        assert_eq!(cs.state, cloned.state);
629    }
630}