1use chrono::{DateTime, Utc};
2use sqlx::PgPool;
3use uuid::Uuid;
4
5use dk_core::{RepoId, SymbolId};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum ChangesetState {
10 Draft,
11 Submitted,
12 Verifying,
13 Approved,
14 Rejected,
15 Merged,
16 Closed,
17}
18
19impl ChangesetState {
20 pub fn parse(s: &str) -> Option<Self> {
22 match s {
23 "draft" => Some(Self::Draft),
24 "submitted" => Some(Self::Submitted),
25 "verifying" => Some(Self::Verifying),
26 "approved" => Some(Self::Approved),
27 "rejected" => Some(Self::Rejected),
28 "merged" => Some(Self::Merged),
29 "closed" => Some(Self::Closed),
30 _ => None,
31 }
32 }
33
34 pub fn as_str(&self) -> &'static str {
36 match self {
37 Self::Draft => "draft",
38 Self::Submitted => "submitted",
39 Self::Verifying => "verifying",
40 Self::Approved => "approved",
41 Self::Rejected => "rejected",
42 Self::Merged => "merged",
43 Self::Closed => "closed",
44 }
45 }
46
47 pub fn can_transition_to(&self, target: Self) -> bool {
56 if target == Self::Closed {
57 return true;
58 }
59 matches!(
60 (self, target),
61 (Self::Draft, Self::Submitted)
62 | (Self::Submitted, Self::Verifying)
63 | (Self::Verifying, Self::Approved)
64 | (Self::Verifying, Self::Rejected)
65 | (Self::Approved, Self::Merged)
66 )
67 }
68}
69
70impl std::fmt::Display for ChangesetState {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.write_str(self.as_str())
73 }
74}
75
76#[derive(Debug, Clone, sqlx::FromRow)]
77pub struct Changeset {
78 pub id: Uuid,
79 pub repo_id: RepoId,
80 pub number: i32,
81 pub title: String,
82 pub intent_summary: Option<String>,
83 pub source_branch: String,
84 pub target_branch: String,
85 pub state: String,
86 pub reason: String,
87 pub session_id: Option<Uuid>,
88 pub agent_id: Option<String>,
89 pub agent_name: Option<String>,
90 pub author_id: Option<Uuid>,
91 pub base_version: Option<String>,
92 pub merged_version: Option<String>,
93 pub created_at: DateTime<Utc>,
94 pub updated_at: DateTime<Utc>,
95 pub merged_at: Option<DateTime<Utc>>,
96}
97
98impl Changeset {
99 pub fn parsed_state(&self) -> Option<ChangesetState> {
101 ChangesetState::parse(&self.state)
102 }
103
104 pub fn transition(
107 &mut self,
108 target: ChangesetState,
109 reason: impl Into<String>,
110 ) -> dk_core::Result<()> {
111 let current = self.parsed_state().ok_or_else(|| {
112 dk_core::Error::Internal(format!("unknown current state: '{}'", self.state))
113 })?;
114
115 if !current.can_transition_to(target) {
116 return Err(dk_core::Error::InvalidInput(format!(
117 "invalid state transition: '{}' -> '{}'",
118 current, target,
119 )));
120 }
121
122 self.state = target.as_str().to_string();
123 self.reason = reason.into();
124 Ok(())
125 }
126}
127
128#[derive(Debug, Clone)]
129pub struct ChangesetFile {
130 pub changeset_id: Uuid,
131 pub file_path: String,
132 pub operation: String,
133 pub content: Option<String>,
134}
135
136#[derive(Debug, Clone)]
137pub struct ChangesetFileMeta {
138 pub file_path: String,
139 pub operation: String,
140 pub size_bytes: i64,
141}
142
143pub struct ChangesetStore {
144 db: PgPool,
145}
146
147impl ChangesetStore {
148 pub fn new(db: PgPool) -> Self {
149 Self { db }
150 }
151
152 pub async fn create(
158 &self,
159 repo_id: RepoId,
160 session_id: Option<Uuid>,
161 agent_id: &str,
162 intent: &str,
163 base_version: Option<&str>,
164 agent_name: &str,
165 ) -> dk_core::Result<Changeset> {
166 let intent_slug: String = intent
167 .to_lowercase()
168 .chars()
169 .map(|c| if c.is_alphanumeric() || c == '-' { c } else { '-' })
170 .collect::<String>()
171 .trim_matches('-')
172 .to_string();
173 let slug = if intent_slug.len() > 50 {
174 intent_slug[..50].trim_end_matches('-').to_string()
175 } else {
176 intent_slug
177 };
178 let source_branch = format!("{}/{}", slug, agent_name);
179 let target_branch = "main";
180
181 let mut tx = self.db.begin().await?;
182
183 sqlx::query("SELECT pg_advisory_xact_lock(hashtext('changeset:' || $1::text))")
184 .bind(repo_id)
185 .execute(&mut *tx)
186 .await?;
187
188 let row: (Uuid, i32, String, String, DateTime<Utc>, DateTime<Utc>) = sqlx::query_as(
189 r#"INSERT INTO changesets
190 (repo_id, number, title, intent_summary, source_branch, target_branch,
191 state, reason, session_id, agent_id, agent_name, base_version)
192 SELECT $1, COALESCE(MAX(number), 0) + 1, $2, $2, $3, $4,
193 'draft', 'created via agent connect', $5, $6, $7, $8
194 FROM changesets WHERE repo_id = $1
195 RETURNING id, number, state, reason, created_at, updated_at"#,
196 )
197 .bind(repo_id)
198 .bind(intent)
199 .bind(&source_branch)
200 .bind(target_branch)
201 .bind(session_id)
202 .bind(agent_id)
203 .bind(agent_name)
204 .bind(base_version)
205 .fetch_one(&mut *tx)
206 .await?;
207
208 tx.commit().await?;
209
210 Ok(Changeset {
211 id: row.0,
212 repo_id,
213 number: row.1,
214 title: intent.to_string(),
215 intent_summary: Some(intent.to_string()),
216 source_branch,
217 target_branch: target_branch.to_string(),
218 state: row.2,
219 reason: row.3,
220 session_id,
221 agent_id: Some(agent_id.to_string()),
222 agent_name: Some(agent_name.to_string()),
223 author_id: None,
224 base_version: base_version.map(String::from),
225 merged_version: None,
226 created_at: row.4,
227 updated_at: row.5,
228 merged_at: None,
229 })
230 }
231
232 pub async fn get(&self, id: Uuid) -> dk_core::Result<Changeset> {
233 sqlx::query_as::<_, Changeset>(
234 r#"SELECT id, repo_id, number, title, intent_summary,
235 source_branch, target_branch, state, reason,
236 session_id, agent_id, agent_name, author_id,
237 base_version, merged_version,
238 created_at, updated_at, merged_at
239 FROM changesets WHERE id = $1"#,
240 )
241 .bind(id)
242 .fetch_optional(&self.db)
243 .await?
244 .ok_or_else(|| dk_core::Error::Internal(format!("changeset {} not found", id)))
245 }
246
247 pub async fn update_status(&self, id: Uuid, status: &str) -> dk_core::Result<()> {
248 self.update_status_with_reason(id, status, "").await
249 }
250
251 pub async fn update_status_with_reason(
253 &self,
254 id: Uuid,
255 status: &str,
256 reason: &str,
257 ) -> dk_core::Result<()> {
258 sqlx::query(
259 "UPDATE changesets SET state = $1, reason = $2, updated_at = now() WHERE id = $3",
260 )
261 .bind(status)
262 .bind(reason)
263 .bind(id)
264 .execute(&self.db)
265 .await?;
266 Ok(())
267 }
268
269 pub async fn update_status_if(
273 &self,
274 id: Uuid,
275 new_status: &str,
276 expected_states: &[&str],
277 ) -> dk_core::Result<()> {
278 self.update_status_if_with_reason(id, new_status, expected_states, "").await
279 }
280
281 pub async fn update_status_if_with_reason(
283 &self,
284 id: Uuid,
285 new_status: &str,
286 expected_states: &[&str],
287 reason: &str,
288 ) -> dk_core::Result<()> {
289 let states: Vec<String> = expected_states.iter().map(|s| s.to_string()).collect();
290 let result = sqlx::query(
291 "UPDATE changesets SET state = $1, reason = $2, updated_at = now() WHERE id = $3 AND state = ANY($4)",
292 )
293 .bind(new_status)
294 .bind(reason)
295 .bind(id)
296 .bind(&states)
297 .execute(&self.db)
298 .await?;
299
300 if result.rows_affected() == 0 {
301 return Err(dk_core::Error::Internal(format!(
302 "changeset {} not found or not in expected state (expected one of: {:?})",
303 id, expected_states,
304 )));
305 }
306 Ok(())
307 }
308
309 pub async fn set_merged(&self, id: Uuid, commit_hash: &str) -> dk_core::Result<()> {
310 sqlx::query(
311 "UPDATE changesets SET state = 'merged', reason = 'merge completed', merged_version = $1, merged_at = now(), updated_at = now() WHERE id = $2",
312 )
313 .bind(commit_hash)
314 .bind(id)
315 .execute(&self.db)
316 .await?;
317 Ok(())
318 }
319
320 pub async fn upsert_file(
321 &self,
322 changeset_id: Uuid,
323 file_path: &str,
324 operation: &str,
325 content: Option<&str>,
326 ) -> dk_core::Result<()> {
327 sqlx::query(
328 r#"INSERT INTO changeset_files (changeset_id, file_path, operation, content)
329 VALUES ($1, $2, $3, $4)
330 ON CONFLICT (changeset_id, file_path) DO UPDATE SET
331 operation = EXCLUDED.operation,
332 content = EXCLUDED.content"#,
333 )
334 .bind(changeset_id)
335 .bind(file_path)
336 .bind(operation)
337 .bind(content)
338 .execute(&self.db)
339 .await?;
340 Ok(())
341 }
342
343 pub async fn get_files(&self, changeset_id: Uuid) -> dk_core::Result<Vec<ChangesetFile>> {
344 let rows: Vec<(Uuid, String, String, Option<String>)> = sqlx::query_as(
345 "SELECT changeset_id, file_path, operation, content FROM changeset_files WHERE changeset_id = $1",
346 )
347 .bind(changeset_id)
348 .fetch_all(&self.db)
349 .await?;
350
351 Ok(rows
352 .into_iter()
353 .map(|r| ChangesetFile {
354 changeset_id: r.0,
355 file_path: r.1,
356 operation: r.2,
357 content: r.3,
358 })
359 .collect())
360 }
361
362 pub async fn get_files_metadata(&self, changeset_id: Uuid) -> dk_core::Result<Vec<ChangesetFileMeta>> {
365 let rows: Vec<(String, String, i64)> = sqlx::query_as(
366 "SELECT file_path, operation, COALESCE(LENGTH(content), 0)::bigint AS size_bytes FROM changeset_files WHERE changeset_id = $1",
367 )
368 .bind(changeset_id)
369 .fetch_all(&self.db)
370 .await?;
371
372 Ok(rows
373 .into_iter()
374 .map(|r| ChangesetFileMeta {
375 file_path: r.0,
376 operation: r.1,
377 size_bytes: r.2,
378 })
379 .collect())
380 }
381
382 pub async fn record_affected_symbol(
383 &self,
384 changeset_id: Uuid,
385 symbol_id: SymbolId,
386 qualified_name: &str,
387 ) -> dk_core::Result<()> {
388 sqlx::query(
389 r#"INSERT INTO changeset_symbols (changeset_id, symbol_id, symbol_qualified_name)
390 VALUES ($1, $2, $3)
391 ON CONFLICT DO NOTHING"#,
392 )
393 .bind(changeset_id)
394 .bind(symbol_id)
395 .bind(qualified_name)
396 .execute(&self.db)
397 .await?;
398 Ok(())
399 }
400
401 pub async fn get_affected_symbols(&self, changeset_id: Uuid) -> dk_core::Result<Vec<(SymbolId, String)>> {
402 let rows: Vec<(Uuid, String)> = sqlx::query_as(
403 "SELECT symbol_id, symbol_qualified_name FROM changeset_symbols WHERE changeset_id = $1",
404 )
405 .bind(changeset_id)
406 .fetch_all(&self.db)
407 .await?;
408 Ok(rows)
409 }
410
411 pub async fn find_conflicting_changesets(
415 &self,
416 repo_id: RepoId,
417 base_version: &str,
418 my_changeset_id: Uuid,
419 ) -> dk_core::Result<Vec<(Uuid, Vec<String>)>> {
420 let rows: Vec<(Uuid, String)> = sqlx::query_as(
421 r#"SELECT DISTINCT cs.changeset_id, cs.symbol_qualified_name
422 FROM changeset_symbols cs
423 JOIN changesets c ON c.id = cs.changeset_id
424 WHERE c.repo_id = $1
425 AND c.state = 'merged'
426 AND c.id != $2
427 AND c.merged_version IS NOT NULL
428 AND c.merged_version != $3
429 AND cs.symbol_qualified_name IN (
430 SELECT symbol_qualified_name FROM changeset_symbols WHERE changeset_id = $2
431 )"#,
432 )
433 .bind(repo_id)
434 .bind(my_changeset_id)
435 .bind(base_version)
436 .fetch_all(&self.db)
437 .await?;
438
439 let mut map: std::collections::HashMap<Uuid, Vec<String>> = std::collections::HashMap::new();
440 for (cs_id, sym_name) in rows {
441 map.entry(cs_id).or_default().push(sym_name);
442 }
443 Ok(map.into_iter().collect())
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450
451 fn slugify_intent(intent: &str) -> String {
454 let slug: String = intent
455 .to_lowercase()
456 .chars()
457 .map(|c| if c.is_alphanumeric() || c == '-' { c } else { '-' })
458 .collect::<String>()
459 .trim_matches('-')
460 .to_string();
461 if slug.len() > 50 {
462 slug[..50].trim_end_matches('-').to_string()
463 } else {
464 slug
465 }
466 }
467
468 #[test]
469 fn source_branch_format_uses_intent_slug() {
470 let intent = "Fix UI bugs";
471 let agent_name = "agent-1";
472 let source_branch = format!("{}/{}", slugify_intent(intent), agent_name);
473 assert_eq!(source_branch, "fix-ui-bugs/agent-1");
474 }
475
476 #[test]
477 fn source_branch_format_with_custom_name() {
478 let intent = "Add comments endpoint";
479 let agent_name = "feature-bot";
480 let source_branch = format!("{}/{}", slugify_intent(intent), agent_name);
481 assert_eq!(source_branch, "add-comments-endpoint/feature-bot");
482 }
483
484 #[test]
485 fn target_branch_is_main() {
486 let target_branch = "main";
488 assert_eq!(target_branch, "main");
489 }
490
491 #[test]
494 fn changeset_create_shape_has_correct_branches() {
495 let repo_id = Uuid::new_v4();
496 let session_id = Uuid::new_v4();
497 let agent_id = "test-agent";
498 let intent = "fix all the bugs";
499
500 let source_branch = format!("agent/{}", agent_id);
501 let now = Utc::now();
502
503 let cs = Changeset {
504 id: Uuid::new_v4(),
505 repo_id,
506 number: 1,
507 title: intent.to_string(),
508 intent_summary: Some(intent.to_string()),
509 source_branch: source_branch.clone(),
510 target_branch: "main".to_string(),
511 state: "draft".to_string(),
512 reason: String::new(),
513 session_id: Some(session_id),
514 agent_id: Some(agent_id.to_string()),
515 agent_name: Some(agent_id.to_string()),
516 author_id: None,
517 base_version: Some("abc123".to_string()),
518 merged_version: None,
519 created_at: now,
520 updated_at: now,
521 merged_at: None,
522 };
523
524 assert_eq!(cs.source_branch, "agent/test-agent");
525 assert_eq!(cs.target_branch, "main");
526 assert_eq!(cs.agent_name.as_deref(), Some("test-agent"));
527 assert_eq!(cs.agent_id, cs.agent_name, "agent_name should equal agent_id per create()");
528 assert!(cs.merged_at.is_none());
529 assert!(cs.merged_version.is_none());
530 }
531
532 #[test]
535 fn changeset_all_fields_accessible() {
536 let now = Utc::now();
537 let id = Uuid::new_v4();
538 let repo_id = Uuid::new_v4();
539
540 let cs = Changeset {
541 id,
542 repo_id,
543 number: 42,
544 title: "test".to_string(),
545 intent_summary: None,
546 source_branch: "agent/a".to_string(),
547 target_branch: "main".to_string(),
548 state: "draft".to_string(),
549 reason: String::new(),
550 session_id: None,
551 agent_id: None,
552 agent_name: None,
553 author_id: None,
554 base_version: None,
555 merged_version: None,
556 created_at: now,
557 updated_at: now,
558 merged_at: None,
559 };
560
561 assert_eq!(cs.id, id);
562 assert_eq!(cs.repo_id, repo_id);
563 assert_eq!(cs.number, 42);
564 assert_eq!(cs.title, "test");
565 assert!(cs.intent_summary.is_none());
566 assert!(cs.session_id.is_none());
567 assert!(cs.agent_id.is_none());
568 assert!(cs.agent_name.is_none());
569 assert!(cs.author_id.is_none());
570 assert!(cs.base_version.is_none());
571 assert!(cs.merged_version.is_none());
572 assert!(cs.merged_at.is_none());
573 }
574
575 #[test]
576 fn changeset_file_meta_struct() {
577 let meta = ChangesetFileMeta {
578 file_path: "src/main.rs".to_string(),
579 operation: "modify".to_string(),
580 size_bytes: 1024,
581 };
582 assert_eq!(meta.file_path, "src/main.rs");
583 assert_eq!(meta.operation, "modify");
584 assert_eq!(meta.size_bytes, 1024);
585 }
586
587 #[test]
588 fn changeset_file_struct() {
589 let cf = ChangesetFile {
590 changeset_id: Uuid::new_v4(),
591 file_path: "lib.rs".to_string(),
592 operation: "add".to_string(),
593 content: Some("fn main() {}".to_string()),
594 };
595 assert_eq!(cf.file_path, "lib.rs");
596 assert_eq!(cf.operation, "add");
597 assert!(cf.content.is_some());
598 }
599
600 #[test]
601 fn changeset_clone_produces_equal_values() {
602 let now = Utc::now();
603 let cs = Changeset {
604 id: Uuid::new_v4(),
605 repo_id: Uuid::new_v4(),
606 number: 1,
607 title: "clone test".to_string(),
608 intent_summary: Some("intent".to_string()),
609 source_branch: "agent/x".to_string(),
610 target_branch: "main".to_string(),
611 state: "draft".to_string(),
612 reason: String::new(),
613 session_id: None,
614 agent_id: Some("x".to_string()),
615 agent_name: Some("x".to_string()),
616 author_id: None,
617 base_version: None,
618 merged_version: None,
619 created_at: now,
620 updated_at: now,
621 merged_at: None,
622 };
623
624 let cloned = cs.clone();
625 assert_eq!(cs.id, cloned.id);
626 assert_eq!(cs.source_branch, cloned.source_branch);
627 assert_eq!(cs.target_branch, cloned.target_branch);
628 assert_eq!(cs.state, cloned.state);
629 }
630}