1use chrono::{DateTime, Utc};
2use sqlx::PgPool;
3use uuid::Uuid;
4
5use dk_core::{RepoId, SymbolId};
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum ChangesetState {
10 Draft,
11 Submitted,
12 Verifying,
13 Approved,
14 Rejected,
15 Merged,
16 Closed,
17}
18
19impl ChangesetState {
20 pub fn parse(s: &str) -> Option<Self> {
22 match s {
23 "draft" => Some(Self::Draft),
24 "submitted" => Some(Self::Submitted),
25 "verifying" => Some(Self::Verifying),
26 "approved" => Some(Self::Approved),
27 "rejected" => Some(Self::Rejected),
28 "merged" => Some(Self::Merged),
29 "closed" => Some(Self::Closed),
30 _ => None,
31 }
32 }
33
34 pub fn as_str(&self) -> &'static str {
36 match self {
37 Self::Draft => "draft",
38 Self::Submitted => "submitted",
39 Self::Verifying => "verifying",
40 Self::Approved => "approved",
41 Self::Rejected => "rejected",
42 Self::Merged => "merged",
43 Self::Closed => "closed",
44 }
45 }
46
47 pub fn can_transition_to(&self, target: Self) -> bool {
56 if target == Self::Closed {
57 return true;
58 }
59 matches!(
60 (self, target),
61 (Self::Draft, Self::Submitted)
62 | (Self::Submitted, Self::Verifying)
63 | (Self::Verifying, Self::Approved)
64 | (Self::Verifying, Self::Rejected)
65 | (Self::Approved, Self::Merged)
66 )
67 }
68}
69
70impl std::fmt::Display for ChangesetState {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.write_str(self.as_str())
73 }
74}
75
76#[derive(Debug, Clone, sqlx::FromRow)]
77pub struct Changeset {
78 pub id: Uuid,
79 pub repo_id: RepoId,
80 pub number: i32,
81 pub title: String,
82 pub intent_summary: Option<String>,
83 pub source_branch: String,
84 pub target_branch: String,
85 pub state: String,
86 pub reason: String,
87 pub session_id: Option<Uuid>,
88 pub agent_id: Option<String>,
89 pub agent_name: Option<String>,
90 pub author_id: Option<Uuid>,
91 pub base_version: Option<String>,
92 pub merged_version: Option<String>,
93 pub created_at: DateTime<Utc>,
94 pub updated_at: DateTime<Utc>,
95 pub merged_at: Option<DateTime<Utc>>,
96}
97
98impl Changeset {
99 pub fn parsed_state(&self) -> Option<ChangesetState> {
101 ChangesetState::parse(&self.state)
102 }
103
104 pub fn transition(
107 &mut self,
108 target: ChangesetState,
109 reason: impl Into<String>,
110 ) -> dk_core::Result<()> {
111 let current = self.parsed_state().ok_or_else(|| {
112 dk_core::Error::Internal(format!("unknown current state: '{}'", self.state))
113 })?;
114
115 if !current.can_transition_to(target) {
116 return Err(dk_core::Error::InvalidInput(format!(
117 "invalid state transition: '{}' -> '{}'",
118 current, target,
119 )));
120 }
121
122 self.state = target.as_str().to_string();
123 self.reason = reason.into();
124 Ok(())
125 }
126}
127
128#[derive(Debug, Clone)]
129pub struct ChangesetFile {
130 pub changeset_id: Uuid,
131 pub file_path: String,
132 pub operation: String,
133 pub content: Option<String>,
134}
135
136#[derive(Debug, Clone)]
137pub struct ChangesetFileMeta {
138 pub file_path: String,
139 pub operation: String,
140 pub size_bytes: i64,
141}
142
143pub struct ChangesetStore {
144 db: PgPool,
145}
146
147impl ChangesetStore {
148 pub fn new(db: PgPool) -> Self {
149 Self { db }
150 }
151
152 pub async fn create(
158 &self,
159 repo_id: RepoId,
160 session_id: Option<Uuid>,
161 agent_id: &str,
162 intent: &str,
163 base_version: Option<&str>,
164 agent_name: &str,
165 ) -> dk_core::Result<Changeset> {
166 let intent_slug: String = intent
167 .to_lowercase()
168 .chars()
169 .map(|c| if c.is_alphanumeric() || c == '-' { c } else { '-' })
170 .collect::<String>()
171 .trim_matches('-')
172 .to_string();
173 let slug = if intent_slug.len() > 50 {
174 let cut = intent_slug
175 .char_indices()
176 .take_while(|(i, _)| *i < 50)
177 .last()
178 .map(|(i, c)| i + c.len_utf8())
179 .unwrap_or(0);
180 intent_slug[..cut].trim_end_matches('-').to_string()
181 } else {
182 intent_slug
183 };
184 let source_branch = format!("{}/{}", slug, agent_name);
185 let target_branch = "main";
186
187 let mut tx = self.db.begin().await?;
188
189 sqlx::query("SELECT pg_advisory_xact_lock(hashtext('changeset:' || $1::text))")
190 .bind(repo_id)
191 .execute(&mut *tx)
192 .await?;
193
194 let row: (Uuid, i32, String, String, DateTime<Utc>, DateTime<Utc>) = sqlx::query_as(
195 r#"INSERT INTO changesets
196 (repo_id, number, title, intent_summary, source_branch, target_branch,
197 state, reason, session_id, agent_id, agent_name, base_version)
198 SELECT $1, COALESCE(MAX(number), 0) + 1, $2, $2, $3, $4,
199 'draft', 'created via agent connect', $5, $6, $7, $8
200 FROM changesets WHERE repo_id = $1
201 RETURNING id, number, state, reason, created_at, updated_at"#,
202 )
203 .bind(repo_id)
204 .bind(intent)
205 .bind(&source_branch)
206 .bind(target_branch)
207 .bind(session_id)
208 .bind(agent_id)
209 .bind(agent_name)
210 .bind(base_version)
211 .fetch_one(&mut *tx)
212 .await?;
213
214 tx.commit().await?;
215
216 Ok(Changeset {
217 id: row.0,
218 repo_id,
219 number: row.1,
220 title: intent.to_string(),
221 intent_summary: Some(intent.to_string()),
222 source_branch,
223 target_branch: target_branch.to_string(),
224 state: row.2,
225 reason: row.3,
226 session_id,
227 agent_id: Some(agent_id.to_string()),
228 agent_name: Some(agent_name.to_string()),
229 author_id: None,
230 base_version: base_version.map(String::from),
231 merged_version: None,
232 created_at: row.4,
233 updated_at: row.5,
234 merged_at: None,
235 })
236 }
237
238 pub async fn get(&self, id: Uuid) -> dk_core::Result<Changeset> {
239 sqlx::query_as::<_, Changeset>(
240 r#"SELECT id, repo_id, number, title, intent_summary,
241 source_branch, target_branch, state, reason,
242 session_id, agent_id, agent_name, author_id,
243 base_version, merged_version,
244 created_at, updated_at, merged_at
245 FROM changesets WHERE id = $1"#,
246 )
247 .bind(id)
248 .fetch_optional(&self.db)
249 .await?
250 .ok_or_else(|| dk_core::Error::Internal(format!("changeset {} not found", id)))
251 }
252
253 pub async fn update_status(&self, id: Uuid, status: &str) -> dk_core::Result<()> {
254 self.update_status_with_reason(id, status, "").await
255 }
256
257 pub async fn update_status_with_reason(
259 &self,
260 id: Uuid,
261 status: &str,
262 reason: &str,
263 ) -> dk_core::Result<()> {
264 sqlx::query(
265 "UPDATE changesets SET state = $1, reason = $2, updated_at = now() WHERE id = $3",
266 )
267 .bind(status)
268 .bind(reason)
269 .bind(id)
270 .execute(&self.db)
271 .await?;
272 Ok(())
273 }
274
275 pub async fn update_status_if(
279 &self,
280 id: Uuid,
281 new_status: &str,
282 expected_states: &[&str],
283 ) -> dk_core::Result<()> {
284 self.update_status_if_with_reason(id, new_status, expected_states, "").await
285 }
286
287 pub async fn update_status_if_with_reason(
289 &self,
290 id: Uuid,
291 new_status: &str,
292 expected_states: &[&str],
293 reason: &str,
294 ) -> dk_core::Result<()> {
295 let states: Vec<String> = expected_states.iter().map(|s| s.to_string()).collect();
296 let result = sqlx::query(
297 "UPDATE changesets SET state = $1, reason = $2, updated_at = now() WHERE id = $3 AND state = ANY($4)",
298 )
299 .bind(new_status)
300 .bind(reason)
301 .bind(id)
302 .bind(&states)
303 .execute(&self.db)
304 .await?;
305
306 if result.rows_affected() == 0 {
307 return Err(dk_core::Error::Internal(format!(
308 "changeset {} not found or not in expected state (expected one of: {:?})",
309 id, expected_states,
310 )));
311 }
312 Ok(())
313 }
314
315 pub async fn set_merged(&self, id: Uuid, commit_hash: &str) -> dk_core::Result<()> {
316 sqlx::query(
317 "UPDATE changesets SET state = 'merged', reason = 'merge completed', merged_version = $1, merged_at = now(), updated_at = now() WHERE id = $2",
318 )
319 .bind(commit_hash)
320 .bind(id)
321 .execute(&self.db)
322 .await?;
323 Ok(())
324 }
325
326 pub async fn upsert_file(
327 &self,
328 changeset_id: Uuid,
329 file_path: &str,
330 operation: &str,
331 content: Option<&str>,
332 ) -> dk_core::Result<()> {
333 sqlx::query(
334 r#"INSERT INTO changeset_files (changeset_id, file_path, operation, content)
335 VALUES ($1, $2, $3, $4)
336 ON CONFLICT (changeset_id, file_path) DO UPDATE SET
337 operation = EXCLUDED.operation,
338 content = EXCLUDED.content"#,
339 )
340 .bind(changeset_id)
341 .bind(file_path)
342 .bind(operation)
343 .bind(content)
344 .execute(&self.db)
345 .await?;
346 Ok(())
347 }
348
349 pub async fn get_files(&self, changeset_id: Uuid) -> dk_core::Result<Vec<ChangesetFile>> {
350 let rows: Vec<(Uuid, String, String, Option<String>)> = sqlx::query_as(
351 "SELECT changeset_id, file_path, operation, content FROM changeset_files WHERE changeset_id = $1",
352 )
353 .bind(changeset_id)
354 .fetch_all(&self.db)
355 .await?;
356
357 Ok(rows
358 .into_iter()
359 .map(|r| ChangesetFile {
360 changeset_id: r.0,
361 file_path: r.1,
362 operation: r.2,
363 content: r.3,
364 })
365 .collect())
366 }
367
368 pub async fn get_files_metadata(&self, changeset_id: Uuid) -> dk_core::Result<Vec<ChangesetFileMeta>> {
371 let rows: Vec<(String, String, i64)> = sqlx::query_as(
372 "SELECT file_path, operation, COALESCE(LENGTH(content), 0)::bigint AS size_bytes FROM changeset_files WHERE changeset_id = $1",
373 )
374 .bind(changeset_id)
375 .fetch_all(&self.db)
376 .await?;
377
378 Ok(rows
379 .into_iter()
380 .map(|r| ChangesetFileMeta {
381 file_path: r.0,
382 operation: r.1,
383 size_bytes: r.2,
384 })
385 .collect())
386 }
387
388 pub async fn record_affected_symbol(
389 &self,
390 changeset_id: Uuid,
391 symbol_id: SymbolId,
392 qualified_name: &str,
393 ) -> dk_core::Result<()> {
394 sqlx::query(
395 r#"INSERT INTO changeset_symbols (changeset_id, symbol_id, symbol_qualified_name)
396 VALUES ($1, $2, $3)
397 ON CONFLICT DO NOTHING"#,
398 )
399 .bind(changeset_id)
400 .bind(symbol_id)
401 .bind(qualified_name)
402 .execute(&self.db)
403 .await?;
404 Ok(())
405 }
406
407 pub async fn get_affected_symbols(&self, changeset_id: Uuid) -> dk_core::Result<Vec<(SymbolId, String)>> {
408 let rows: Vec<(Uuid, String)> = sqlx::query_as(
409 "SELECT symbol_id, symbol_qualified_name FROM changeset_symbols WHERE changeset_id = $1",
410 )
411 .bind(changeset_id)
412 .fetch_all(&self.db)
413 .await?;
414 Ok(rows)
415 }
416
417 pub async fn find_conflicting_changesets(
421 &self,
422 repo_id: RepoId,
423 base_version: &str,
424 my_changeset_id: Uuid,
425 ) -> dk_core::Result<Vec<(Uuid, Vec<String>)>> {
426 let rows: Vec<(Uuid, String)> = sqlx::query_as(
427 r#"SELECT DISTINCT cs.changeset_id, cs.symbol_qualified_name
428 FROM changeset_symbols cs
429 JOIN changesets c ON c.id = cs.changeset_id
430 WHERE c.repo_id = $1
431 AND c.state = 'merged'
432 AND c.id != $2
433 AND c.merged_version IS NOT NULL
434 AND c.merged_version != $3
435 AND cs.symbol_qualified_name IN (
436 SELECT symbol_qualified_name FROM changeset_symbols WHERE changeset_id = $2
437 )"#,
438 )
439 .bind(repo_id)
440 .bind(my_changeset_id)
441 .bind(base_version)
442 .fetch_all(&self.db)
443 .await?;
444
445 let mut map: std::collections::HashMap<Uuid, Vec<String>> = std::collections::HashMap::new();
446 for (cs_id, sym_name) in rows {
447 map.entry(cs_id).or_default().push(sym_name);
448 }
449 Ok(map.into_iter().collect())
450 }
451}
452
453#[cfg(test)]
454mod tests {
455 use super::*;
456
457 fn slugify_intent(intent: &str) -> String {
460 let slug: String = intent
461 .to_lowercase()
462 .chars()
463 .map(|c| if c.is_alphanumeric() || c == '-' { c } else { '-' })
464 .collect::<String>()
465 .trim_matches('-')
466 .to_string();
467 if slug.len() > 50 {
468 slug[..50].trim_end_matches('-').to_string()
469 } else {
470 slug
471 }
472 }
473
474 #[test]
475 fn source_branch_format_uses_intent_slug() {
476 let intent = "Fix UI bugs";
477 let agent_name = "agent-1";
478 let source_branch = format!("{}/{}", slugify_intent(intent), agent_name);
479 assert_eq!(source_branch, "fix-ui-bugs/agent-1");
480 }
481
482 #[test]
483 fn source_branch_format_with_custom_name() {
484 let intent = "Add comments endpoint";
485 let agent_name = "feature-bot";
486 let source_branch = format!("{}/{}", slugify_intent(intent), agent_name);
487 assert_eq!(source_branch, "add-comments-endpoint/feature-bot");
488 }
489
490 #[test]
491 fn target_branch_is_main() {
492 let target_branch = "main";
494 assert_eq!(target_branch, "main");
495 }
496
497 #[test]
500 fn changeset_create_shape_has_correct_branches() {
501 let repo_id = Uuid::new_v4();
502 let session_id = Uuid::new_v4();
503 let agent_id = "test-agent";
504 let intent = "fix all the bugs";
505
506 let source_branch = format!("agent/{}", agent_id);
507 let now = Utc::now();
508
509 let cs = Changeset {
510 id: Uuid::new_v4(),
511 repo_id,
512 number: 1,
513 title: intent.to_string(),
514 intent_summary: Some(intent.to_string()),
515 source_branch: source_branch.clone(),
516 target_branch: "main".to_string(),
517 state: "draft".to_string(),
518 reason: String::new(),
519 session_id: Some(session_id),
520 agent_id: Some(agent_id.to_string()),
521 agent_name: Some(agent_id.to_string()),
522 author_id: None,
523 base_version: Some("abc123".to_string()),
524 merged_version: None,
525 created_at: now,
526 updated_at: now,
527 merged_at: None,
528 };
529
530 assert_eq!(cs.source_branch, "agent/test-agent");
531 assert_eq!(cs.target_branch, "main");
532 assert_eq!(cs.agent_name.as_deref(), Some("test-agent"));
533 assert_eq!(cs.agent_id, cs.agent_name, "agent_name should equal agent_id per create()");
534 assert!(cs.merged_at.is_none());
535 assert!(cs.merged_version.is_none());
536 }
537
538 #[test]
541 fn changeset_all_fields_accessible() {
542 let now = Utc::now();
543 let id = Uuid::new_v4();
544 let repo_id = Uuid::new_v4();
545
546 let cs = Changeset {
547 id,
548 repo_id,
549 number: 42,
550 title: "test".to_string(),
551 intent_summary: None,
552 source_branch: "agent/a".to_string(),
553 target_branch: "main".to_string(),
554 state: "draft".to_string(),
555 reason: String::new(),
556 session_id: None,
557 agent_id: None,
558 agent_name: None,
559 author_id: None,
560 base_version: None,
561 merged_version: None,
562 created_at: now,
563 updated_at: now,
564 merged_at: None,
565 };
566
567 assert_eq!(cs.id, id);
568 assert_eq!(cs.repo_id, repo_id);
569 assert_eq!(cs.number, 42);
570 assert_eq!(cs.title, "test");
571 assert!(cs.intent_summary.is_none());
572 assert!(cs.session_id.is_none());
573 assert!(cs.agent_id.is_none());
574 assert!(cs.agent_name.is_none());
575 assert!(cs.author_id.is_none());
576 assert!(cs.base_version.is_none());
577 assert!(cs.merged_version.is_none());
578 assert!(cs.merged_at.is_none());
579 }
580
581 #[test]
582 fn changeset_file_meta_struct() {
583 let meta = ChangesetFileMeta {
584 file_path: "src/main.rs".to_string(),
585 operation: "modify".to_string(),
586 size_bytes: 1024,
587 };
588 assert_eq!(meta.file_path, "src/main.rs");
589 assert_eq!(meta.operation, "modify");
590 assert_eq!(meta.size_bytes, 1024);
591 }
592
593 #[test]
594 fn changeset_file_struct() {
595 let cf = ChangesetFile {
596 changeset_id: Uuid::new_v4(),
597 file_path: "lib.rs".to_string(),
598 operation: "add".to_string(),
599 content: Some("fn main() {}".to_string()),
600 };
601 assert_eq!(cf.file_path, "lib.rs");
602 assert_eq!(cf.operation, "add");
603 assert!(cf.content.is_some());
604 }
605
606 #[test]
607 fn changeset_clone_produces_equal_values() {
608 let now = Utc::now();
609 let cs = Changeset {
610 id: Uuid::new_v4(),
611 repo_id: Uuid::new_v4(),
612 number: 1,
613 title: "clone test".to_string(),
614 intent_summary: Some("intent".to_string()),
615 source_branch: "agent/x".to_string(),
616 target_branch: "main".to_string(),
617 state: "draft".to_string(),
618 reason: String::new(),
619 session_id: None,
620 agent_id: Some("x".to_string()),
621 agent_name: Some("x".to_string()),
622 author_id: None,
623 base_version: None,
624 merged_version: None,
625 created_at: now,
626 updated_at: now,
627 merged_at: None,
628 };
629
630 let cloned = cs.clone();
631 assert_eq!(cs.id, cloned.id);
632 assert_eq!(cs.source_branch, cloned.source_branch);
633 assert_eq!(cs.target_branch, cloned.target_branch);
634 assert_eq!(cs.state, cloned.state);
635 }
636}