use std::collections::BTreeSet;
use std::convert::TryFrom;
use std::fs;
use std::fs::OpenOptions;
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::process;
use std::thread;
use std::time::{Duration, Instant, SystemTime};
use anyhow::{bail, Context, Result};
use rusqlite::Connection;
use serde::{Deserialize, Serialize};
use tracing::debug;
use crate::extensions::backlog_state::{
self as backlog, BacklogRef, GitHubBacklogCache, GitHubBacklogDispatchView,
};
use crate::paths::state::{ExtensionDispatchStatePaths, StateLayout};
const EXTENSION_DISPATCH_LOCK_FILE: &str = ".extension-dispatch.lock";
const EXTENSION_DISPATCH_LOCK_POLL_MS: u64 = 50;
const EXTENSION_DISPATCH_LOCK_TIMEOUT_MS: u64 = 5_000;
const EXTENSION_DISPATCH_LOCK_STALE_AFTER_SECS: u64 = 60;
const EXTENSION_DISPATCH_DB_SCHEMA_VERSION: u32 = 1;
const EXTENSION_DISPATCH_DB_SCHEMA: &str = r#"
CREATE TABLE IF NOT EXISTS assignments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
backlog_provider TEXT,
backlog_kind TEXT,
backlog_id TEXT,
backlog_url TEXT NOT NULL DEFAULT '',
ccd_id INTEGER NOT NULL DEFAULT 0,
github_issue_number INTEGER NOT NULL DEFAULT 0,
session_id TEXT NOT NULL DEFAULT '',
worktree TEXT NOT NULL,
branch TEXT
);
"#;
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
pub struct LocalAssignmentView {
pub backlog_ref: BacklogRef,
pub ccd_id: u64,
pub github_issue_number: u64,
pub title: String,
pub session_id: String,
pub branch: Option<String>,
pub worktree: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct LocalDispatchView {
pub path: String,
pub status: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub dispatch: Option<GitHubBacklogDispatchView>,
#[serde(skip_serializing_if = "Option::is_none")]
pub assignment: Option<LocalAssignmentView>,
}
pub struct LocalAssignmentRequest<'a> {
pub session_id: &'a str,
pub branch: Option<&'a str>,
pub cache: Option<&'a GitHubBacklogCache>,
pub cache_status: &'static str,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
struct DispatchAssignment {
#[serde(default)]
backlog_ref: Option<BacklogRef>,
ccd_id: u64,
#[serde(default)]
github_issue_number: u64,
#[serde(default)]
session_id: String,
worktree: String,
#[serde(default)]
branch: Option<String>,
}
struct DispatchLock {
path: PathBuf,
}
impl Drop for DispatchLock {
fn drop(&mut self) {
let _ = fs::remove_file(&self.path);
}
}
#[cfg(any(feature = "extension-backlog", test))]
pub fn load_claimed_ids(paths: &ExtensionDispatchStatePaths) -> Result<BTreeSet<u64>> {
Ok(load_dispatch_state(paths)?
.into_iter()
.filter_map(|assignment| (assignment.ccd_id != 0).then_some(assignment.ccd_id))
.collect())
}
#[cfg(any(feature = "extension-backlog", test))]
pub fn load_claimed_refs(paths: &ExtensionDispatchStatePaths) -> Result<BTreeSet<String>> {
Ok(load_dispatch_state(paths)?
.into_iter()
.filter_map(|assignment| {
assignment
.backlog_ref
.filter(|backlog_ref| !backlog_ref.is_empty())
.map(|backlog_ref| backlog_ref.key())
})
.collect())
}
pub fn ensure_local_assignment(
repo_root: &Path,
layout: &StateLayout,
locality_id: &str,
request: LocalAssignmentRequest<'_>,
) -> Result<LocalDispatchView> {
let LocalAssignmentRequest {
session_id,
branch,
cache,
cache_status,
} = request;
let dispatch_paths = layout.extension_dispatch_state_paths(locality_id)?;
let dispatch_path = dispatch_paths.primary().to_path_buf();
let _lock = acquire_dispatch_lock(&dispatch_path)?;
let mut state = load_dispatch_assignments(&dispatch_paths)?;
let worktree = repo_root
.file_name()
.map(|value| value.to_string_lossy().into_owned())
.unwrap_or_else(|| repo_root.display().to_string());
let mut state_changed = false;
if !session_id.is_empty() {
let original_len = state.len();
state.retain(|assignment| !assignment.session_id.is_empty());
if state.len() != original_len {
state_changed = true;
}
}
if let Some(cache) = cache {
if hydrate_assignments_from_cache(&mut state, cache) {
state_changed = true;
}
let original_len = state.len();
state.retain(|assignment| {
cache
.items
.iter()
.find(|item| assignment_matches_item(assignment, item))
.map(|item| item.is_active())
.unwrap_or(true)
});
if state.len() != original_len {
state_changed = true;
}
}
let existing = find_session_assignment(&state, session_id)
.or_else(|| branch.and_then(|value| find_branch_assignment(&state, value)));
if let Some(existing) = existing {
if state_changed {
persist_dispatch_state(&dispatch_path, &state)?;
}
return Ok(LocalDispatchView {
path: dispatch_path.display().to_string(),
status: "existing",
reason: None,
dispatch: None,
assignment: Some(view_for_assignment(existing, cache)),
});
}
if session_id.is_empty() && branch.is_none() {
if state_changed {
persist_dispatch_state(&dispatch_path, &state)?;
}
return Ok(LocalDispatchView {
path: dispatch_path.display().to_string(),
status: "skipped",
reason: Some("detached HEAD does not allow stable local assignment".to_owned()),
dispatch: None,
assignment: None,
});
}
let Some(cache) = cache else {
if state_changed {
persist_dispatch_state(&dispatch_path, &state)?;
}
return Ok(LocalDispatchView {
path: dispatch_path.display().to_string(),
status: "skipped",
reason: None,
dispatch: None,
assignment: None,
});
};
if cache_status == "stale" {
let claimed_ref_keys = claimed_ref_keys(&state);
let claimed_ids = claimed_ids(&state);
if state_changed {
persist_dispatch_state(&dispatch_path, &state)?;
}
return Ok(LocalDispatchView {
path: dispatch_path.display().to_string(),
status: "skipped",
reason: Some("work queue cache is stale".to_owned()),
dispatch: backlog::dispatch_summary_view(cache, &claimed_ref_keys, &claimed_ids),
assignment: None,
});
}
let dep_errors = backlog::dep_graph_errors(cache);
if !dep_errors.is_empty() {
if state_changed {
persist_dispatch_state(&dispatch_path, &state)?;
}
return Ok(LocalDispatchView {
path: dispatch_path.display().to_string(),
status: "skipped",
reason: Some(format!(
"work queue has dependency graph errors: {}",
dep_errors.join("; ")
)),
dispatch: None,
assignment: None,
});
}
let claimed_ref_keys = claimed_ref_keys(&state);
let claimed_ids = claimed_ids(&state);
let dispatch = backlog::dispatch_summary_view(cache, &claimed_ref_keys, &claimed_ids);
let Some(next) = backlog::next_dispatch_item(cache, &claimed_ref_keys, &claimed_ids) else {
if state_changed {
persist_dispatch_state(&dispatch_path, &state)?;
}
return Ok(LocalDispatchView {
path: dispatch_path.display().to_string(),
status: "skipped",
reason: dispatch
.as_ref()
.and_then(|view| view.reason.clone())
.or_else(|| Some("no ready unclaimed backlog item is available".to_owned())),
dispatch,
assignment: None,
});
};
let assignment = DispatchAssignment {
backlog_ref: (!next.backlog_ref.is_empty()).then_some(next.backlog_ref.clone()),
ccd_id: next.ccd_id,
github_issue_number: next.github_issue_number,
session_id: session_id.to_owned(),
branch: branch.map(str::to_owned),
worktree,
};
state.push(assignment.clone());
persist_dispatch_state(&dispatch_path, &state)?;
Ok(LocalDispatchView {
path: dispatch_path.display().to_string(),
status: "assigned",
reason: None,
dispatch,
assignment: Some(view_for_assignment(&assignment, Some(cache))),
})
}
pub fn ensure_local_assignment_by_branch(
repo_root: &Path,
layout: &StateLayout,
locality_id: &str,
branch: &str,
cache: Option<&GitHubBacklogCache>,
cache_status: &'static str,
) -> Result<LocalDispatchView> {
ensure_local_assignment(
repo_root,
layout,
locality_id,
LocalAssignmentRequest {
session_id: "",
branch: if branch == "HEAD" { None } else { Some(branch) },
cache,
cache_status,
},
)
}
pub(crate) fn resolve_branch_assignment(
layout: &StateLayout,
locality_id: &str,
branch: &str,
cache: Option<&GitHubBacklogCache>,
) -> Result<Option<LocalAssignmentView>> {
if branch == "HEAD" {
return Ok(None);
}
let dispatch_paths = layout.extension_dispatch_state_paths(locality_id)?;
let state = load_dispatch_state(&dispatch_paths)?;
Ok(find_branch_assignment(&state, branch)
.map(|assignment| view_for_assignment(assignment, cache)))
}
pub fn resolve_session_assignment(
layout: &StateLayout,
locality_id: &str,
session_id: &str,
cache: Option<&GitHubBacklogCache>,
) -> Result<Option<LocalAssignmentView>> {
if session_id.is_empty() {
return Ok(None);
}
let dispatch_paths = layout.extension_dispatch_state_paths(locality_id)?;
let state = load_dispatch_state(&dispatch_paths)?;
Ok(find_session_assignment(&state, session_id)
.map(|assignment| view_for_assignment(assignment, cache)))
}
pub fn adopt_unsessioned_entries(
layout: &StateLayout,
locality_id: &str,
session_id: &str,
) -> Result<Vec<LocalAssignmentView>> {
let dispatch_paths = layout.extension_dispatch_state_paths(locality_id)?;
adopt_unsessioned_entries_for_paths(&dispatch_paths, session_id)
}
fn adopt_unsessioned_entries_for_paths(
dispatch_paths: &ExtensionDispatchStatePaths,
session_id: &str,
) -> Result<Vec<LocalAssignmentView>> {
let dispatch_path = dispatch_paths.primary().to_path_buf();
let _lock = acquire_dispatch_lock(&dispatch_path)?;
let mut state = load_dispatch_assignments(dispatch_paths)?;
let mut adopted = Vec::new();
for assignment in &mut state {
if assignment.session_id.is_empty() {
assignment.session_id = session_id.to_owned();
adopted.push(view_for_assignment(assignment, None));
}
}
if !adopted.is_empty() {
persist_dispatch_state(&dispatch_path, &state)?;
}
Ok(adopted)
}
pub fn remove_session_entries(
layout: &StateLayout,
locality_id: &str,
session_id: &str,
) -> Result<()> {
let dispatch_paths = layout.extension_dispatch_state_paths(locality_id)?;
remove_session_entries_for_paths(&dispatch_paths, session_id)
}
fn remove_session_entries_for_paths(
dispatch_paths: &ExtensionDispatchStatePaths,
session_id: &str,
) -> Result<()> {
let dispatch_path = dispatch_paths.primary().to_path_buf();
let _lock = acquire_dispatch_lock(&dispatch_path)?;
let mut state = load_dispatch_assignments(dispatch_paths)?;
let original_len = state.len();
state.retain(|assignment| assignment.session_id != session_id);
if state.len() != original_len {
persist_dispatch_state(&dispatch_path, &state)?;
}
Ok(())
}
fn load_dispatch_state(paths: &ExtensionDispatchStatePaths) -> Result<Vec<DispatchAssignment>> {
let dispatch_path = paths.primary().to_path_buf();
let _lock = acquire_dispatch_lock(&dispatch_path)?;
load_dispatch_assignments(paths)
}
fn load_dispatch_assignments(
paths: &ExtensionDispatchStatePaths,
) -> Result<Vec<DispatchAssignment>> {
match read_dispatch_db(paths.primary())? {
Some(assignments) => Ok(assignments),
None => Ok(Vec::new()),
}
}
fn acquire_dispatch_lock(dispatch_path: &Path) -> Result<DispatchLock> {
let parent = dispatch_path.parent().ok_or_else(|| {
anyhow::anyhow!("dispatch path has no parent: {}", dispatch_path.display())
})?;
fs::create_dir_all(parent).with_context(|| format!("failed to create {}", parent.display()))?;
let lock_path = parent.join(EXTENSION_DISPATCH_LOCK_FILE);
let deadline = Instant::now() + Duration::from_millis(EXTENSION_DISPATCH_LOCK_TIMEOUT_MS);
loop {
match create_lock_file(&lock_path) {
Ok(()) => return Ok(DispatchLock { path: lock_path }),
Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {
if Instant::now() >= deadline {
if clear_stale_lock(&lock_path)? {
continue;
}
bail!(
"backlog dispatch lock is busy at {}; another `ccd start` may still be assigning work in this pod",
lock_path.display()
);
}
thread::sleep(Duration::from_millis(EXTENSION_DISPATCH_LOCK_POLL_MS));
}
Err(error) => {
return Err(error)
.with_context(|| format!("failed to acquire {}", lock_path.display()));
}
}
}
}
fn clear_stale_lock(path: &Path) -> Result<bool> {
let metadata = match fs::metadata(path) {
Ok(metadata) => metadata,
Err(error) if error.kind() == io::ErrorKind::NotFound => return Ok(true),
Err(error) => {
return Err(error).with_context(|| format!("failed to stat {}", path.display()));
}
};
let modified = metadata
.modified()
.with_context(|| format!("failed to read lock mtime for {}", path.display()))?;
let age = SystemTime::now()
.duration_since(modified)
.unwrap_or_default();
if age < Duration::from_secs(EXTENSION_DISPATCH_LOCK_STALE_AFTER_SECS) {
return Ok(false);
}
fs::remove_file(path).with_context(|| format!("failed to remove {}", path.display()))?;
Ok(true)
}
fn create_lock_file(path: &Path) -> io::Result<()> {
let mut file = OpenOptions::new().write(true).create_new(true).open(path)?;
writeln!(file, "pid={} kind=extension-dispatch", process::id())?;
file.sync_all()
}
fn find_branch_assignment<'a>(
assignments: &'a [DispatchAssignment],
branch: &str,
) -> Option<&'a DispatchAssignment> {
assignments
.iter()
.find(|assignment| assignment.branch.as_deref() == Some(branch))
}
fn find_session_assignment<'a>(
assignments: &'a [DispatchAssignment],
session_id: &str,
) -> Option<&'a DispatchAssignment> {
if session_id.is_empty() {
return None;
}
assignments
.iter()
.find(|assignment| assignment.session_id == session_id)
}
fn persist_dispatch_state(dispatch_path: &Path, assignments: &[DispatchAssignment]) -> Result<()> {
write_dispatch_state(dispatch_path, assignments)
}
fn write_dispatch_state(dispatch_path: &Path, assignments: &[DispatchAssignment]) -> Result<()> {
debug!(path = %dispatch_path.display(), count = assignments.len(), "writing dispatch state");
let mut conn = open_dispatch_db(dispatch_path)?;
let tx = conn.transaction()?;
tx.execute("DELETE FROM assignments", [])?;
for assignment in assignments {
let (backlog_provider, backlog_kind, backlog_id, backlog_url) =
assignment_backlog_columns(assignment);
tx.execute(
"INSERT INTO assignments (
backlog_provider,
backlog_kind,
backlog_id,
backlog_url,
ccd_id,
github_issue_number,
session_id,
worktree,
branch
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
rusqlite::params![
backlog_provider,
backlog_kind,
backlog_id,
backlog_url,
i64::try_from(assignment.ccd_id)
.with_context(|| format!("ccd_id overflow for {}", dispatch_path.display()))?,
i64::try_from(assignment.github_issue_number).with_context(|| {
format!(
"github_issue_number overflow for {}",
dispatch_path.display()
)
})?,
assignment.session_id,
assignment.worktree,
assignment.branch,
],
)?;
}
tx.commit()?;
Ok(())
}
fn read_dispatch_db(path: &Path) -> Result<Option<Vec<DispatchAssignment>>> {
if !path.exists() {
return Ok(None);
}
let conn = open_dispatch_db(path)?;
let mut stmt = conn.prepare(
"SELECT backlog_provider, backlog_kind, backlog_id, backlog_url,
ccd_id, github_issue_number, session_id, worktree, branch
FROM assignments
ORDER BY id ASC",
)?;
let rows = stmt.query_map([], |row| {
Ok((
row.get::<_, Option<String>>(0)?,
row.get::<_, Option<String>>(1)?,
row.get::<_, Option<String>>(2)?,
row.get::<_, String>(3)?,
row.get::<_, i64>(4)?,
row.get::<_, i64>(5)?,
row.get::<_, String>(6)?,
row.get::<_, String>(7)?,
row.get::<_, Option<String>>(8)?,
))
})?;
let mut assignments = Vec::new();
for row in rows {
let (
backlog_provider,
backlog_kind,
backlog_id,
backlog_url,
ccd_id,
github_issue_number,
session_id,
worktree,
branch,
) = row?;
let ccd_id = u64::try_from(ccd_id)
.with_context(|| format!("invalid ccd_id value `{ccd_id}` in {}", path.display()))?;
let github_issue_number = u64::try_from(github_issue_number).with_context(|| {
format!(
"invalid github_issue_number value `{github_issue_number}` in {}",
path.display()
)
})?;
assignments.push(DispatchAssignment {
backlog_ref: build_backlog_ref(backlog_provider, backlog_kind, backlog_id, backlog_url),
ccd_id,
github_issue_number,
session_id,
worktree,
branch,
});
}
Ok(Some(assignments))
}
fn open_dispatch_db(path: &Path) -> Result<Connection> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)
.with_context(|| format!("failed to create {}", parent.display()))?;
}
let conn =
Connection::open(path).with_context(|| format!("open dispatch db: {}", path.display()))?;
conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
migrate_dispatch_db(&conn, path)?;
Ok(conn)
}
fn migrate_dispatch_db(conn: &Connection, path: &Path) -> Result<()> {
let version: u32 = conn.pragma_query_value(None, "user_version", |row| row.get(0))?;
if version == EXTENSION_DISPATCH_DB_SCHEMA_VERSION {
return Ok(());
}
if version > EXTENSION_DISPATCH_DB_SCHEMA_VERSION {
bail!(
"backlog dispatch db schema version {} is newer than supported ({}) at {}; upgrade the ccd CLI",
version,
EXTENSION_DISPATCH_DB_SCHEMA_VERSION,
path.display()
);
}
if version == 0 {
conn.execute_batch(EXTENSION_DISPATCH_DB_SCHEMA)?;
conn.pragma_update(None, "user_version", EXTENSION_DISPATCH_DB_SCHEMA_VERSION)?;
return Ok(());
}
bail!(
"unsupported backlog dispatch db schema version {} at {}; delete or migrate the DB before retrying",
version,
path.display()
)
}
fn assignment_backlog_columns(
assignment: &DispatchAssignment,
) -> (Option<&str>, Option<&str>, Option<&str>, String) {
let backlog_ref = assignment
.backlog_ref
.as_ref()
.filter(|backlog_ref| !backlog_ref.is_empty());
(
backlog_ref.map(|backlog_ref| backlog_ref.provider.as_str()),
backlog_ref.map(|backlog_ref| backlog_ref.kind.as_str()),
backlog_ref.map(|backlog_ref| backlog_ref.id.as_str()),
backlog_ref
.map(|backlog_ref| backlog_ref.url.clone())
.unwrap_or_default(),
)
}
fn build_backlog_ref(
provider: Option<String>,
kind: Option<String>,
id: Option<String>,
url: String,
) -> Option<BacklogRef> {
let provider = provider.filter(|value| !value.is_empty())?;
let kind = kind.filter(|value| !value.is_empty())?;
let id = id.filter(|value| !value.is_empty())?;
Some(BacklogRef {
provider,
kind,
id,
url,
})
}
fn hydrate_assignments_from_cache(
assignments: &mut [DispatchAssignment],
cache: &GitHubBacklogCache,
) -> bool {
let mut changed = false;
for assignment in assignments {
let Some(item) = cache
.items
.iter()
.find(|item| assignment_matches_item(assignment, item))
else {
continue;
};
if assignment
.backlog_ref
.as_ref()
.map(BacklogRef::is_empty)
.unwrap_or(true)
&& !item.backlog_ref.is_empty()
{
assignment.backlog_ref = Some(item.backlog_ref.clone());
changed = true;
}
if assignment.github_issue_number == 0 && item.github_issue_number != 0 {
assignment.github_issue_number = item.github_issue_number;
changed = true;
}
if assignment.ccd_id == 0 && item.ccd_id != 0 {
assignment.ccd_id = item.ccd_id;
changed = true;
}
}
changed
}
fn view_for_assignment(
assignment: &DispatchAssignment,
cache: Option<&GitHubBacklogCache>,
) -> LocalAssignmentView {
let matched_item = cache.and_then(|items| {
items
.items
.iter()
.find(|item| assignment_matches_item(assignment, item))
});
let title = matched_item
.map(|item| backlog::display_title(&item.title))
.unwrap_or_else(|| {
if assignment.ccd_id != 0 {
format!("ccd#{}", assignment.ccd_id)
} else if assignment.github_issue_number != 0 {
format!("GH#{}", assignment.github_issue_number)
} else if let Some(backlog_ref) = assignment
.backlog_ref
.as_ref()
.filter(|backlog_ref| !backlog_ref.is_empty())
{
backlog_ref.key()
} else {
"backlog item".to_owned()
}
});
LocalAssignmentView {
backlog_ref: matched_item
.map(|item| item.backlog_ref.clone())
.filter(|backlog_ref| !backlog_ref.is_empty())
.or_else(|| {
assignment
.backlog_ref
.clone()
.filter(|backlog_ref| !backlog_ref.is_empty())
})
.unwrap_or_default(),
ccd_id: assignment.ccd_id,
github_issue_number: matched_item
.map(|item| item.github_issue_number)
.unwrap_or(assignment.github_issue_number),
title,
session_id: assignment.session_id.clone(),
branch: assignment.branch.clone(),
worktree: assignment.worktree.clone(),
}
}
fn claimed_ids(assignments: &[DispatchAssignment]) -> BTreeSet<u64> {
assignments
.iter()
.filter_map(|assignment| (assignment.ccd_id != 0).then_some(assignment.ccd_id))
.collect()
}
fn claimed_ref_keys(assignments: &[DispatchAssignment]) -> BTreeSet<String> {
assignments
.iter()
.filter_map(|assignment| {
assignment
.backlog_ref
.as_ref()
.filter(|backlog_ref| !backlog_ref.is_empty())
.map(BacklogRef::key)
})
.collect()
}
fn assignment_matches_item(
assignment: &DispatchAssignment,
item: &backlog::GitHubBacklogItem,
) -> bool {
assignment
.backlog_ref
.as_ref()
.filter(|backlog_ref| !backlog_ref.is_empty() && !item.backlog_ref.is_empty())
.map(|backlog_ref| backlog_ref.key() == item.backlog_ref.key())
.unwrap_or(false)
|| (assignment.github_issue_number != 0
&& item.backlog_ref.provider == "github-issues"
&& item.github_issue_number == assignment.github_issue_number)
|| (assignment.ccd_id != 0 && item.ccd_id == assignment.ccd_id)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::paths::state::ExtensionDispatchStatePaths;
fn dispatch_paths_primary_only(path: &Path) -> ExtensionDispatchStatePaths {
ExtensionDispatchStatePaths::new(path.to_path_buf())
}
fn write_dispatch_assignments(path: &Path, assignments: &[DispatchAssignment]) {
write_dispatch_state(path, assignments).unwrap();
}
fn read_dispatch_assignments(path: &Path) -> Vec<DispatchAssignment> {
read_dispatch_db(path).unwrap().unwrap_or_default()
}
#[test]
fn load_claimed_ids_extracts_ids_from_private_dispatch_db() {
let dir = tempfile::tempdir().unwrap();
let dispatch_path = dir.path().join("dispatch-state.db");
write_dispatch_assignments(
&dispatch_path,
&[
DispatchAssignment {
backlog_ref: None,
ccd_id: 42,
github_issue_number: 0,
session_id: "ses_1".to_owned(),
worktree: "repo".to_owned(),
branch: Some("session/foo".to_owned()),
},
DispatchAssignment {
backlog_ref: None,
ccd_id: 99,
github_issue_number: 0,
session_id: "ses_2".to_owned(),
worktree: "repo".to_owned(),
branch: Some("session/bar".to_owned()),
},
],
);
let ids = load_claimed_ids(&dispatch_paths_primary_only(&dispatch_path)).unwrap();
assert_eq!(ids, BTreeSet::from([42, 99]));
}
#[test]
fn read_dispatch_db_fails_closed_on_negative_ccd_id() {
let dir = tempfile::tempdir().unwrap();
let dispatch_path = dir.path().join("extensions/backlog/dispatch-state.db");
let conn = open_dispatch_db(&dispatch_path).unwrap();
conn.execute(
"INSERT INTO assignments (
backlog_provider, backlog_kind, backlog_id, backlog_url,
ccd_id, github_issue_number, session_id, worktree, branch
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
rusqlite::params![
Option::<String>::None,
Option::<String>::None,
Option::<String>::None,
"",
-1_i64,
0_i64,
"ses_bad",
"repo",
Option::<String>::None,
],
)
.unwrap();
let err = read_dispatch_db(&dispatch_path).unwrap_err();
assert!(err.to_string().contains("invalid ccd_id value `-1`"));
}
#[test]
fn adopt_unsessioned_entries_promotes_all_db_entries() {
let dir = tempfile::tempdir().unwrap();
let dispatch_path = dir.path().join("extensions/backlog/dispatch-state.db");
write_dispatch_assignments(
&dispatch_path,
&[
DispatchAssignment {
backlog_ref: None,
ccd_id: 42,
github_issue_number: 0,
session_id: String::new(),
worktree: "repo".to_owned(),
branch: Some("main".to_owned()),
},
DispatchAssignment {
backlog_ref: None,
ccd_id: 99,
github_issue_number: 0,
session_id: "ses_EXISTING".to_owned(),
worktree: "repo".to_owned(),
branch: Some("feature".to_owned()),
},
],
);
let dispatch_paths = dispatch_paths_primary_only(&dispatch_path);
let adopted = adopt_unsessioned_entries_for_paths(&dispatch_paths, "ses_NEW").unwrap();
assert_eq!(adopted.len(), 1);
assert_eq!(adopted[0].ccd_id, 42);
assert_eq!(adopted[0].session_id, "ses_NEW");
let assignments = read_dispatch_assignments(&dispatch_path);
assert_eq!(assignments.len(), 2);
assert!(assignments
.iter()
.any(|assignment| assignment.session_id == "ses_NEW"));
assert!(assignments
.iter()
.any(|assignment| assignment.session_id == "ses_EXISTING"));
}
#[test]
fn remove_session_entries_clears_matching_session() {
let dir = tempfile::tempdir().unwrap();
let dispatch_path = dir.path().join("dispatch-state.db");
write_dispatch_assignments(
&dispatch_path,
&[
DispatchAssignment {
backlog_ref: None,
ccd_id: 42,
github_issue_number: 0,
session_id: "ses_OLD".to_owned(),
worktree: "repo".to_owned(),
branch: Some("main".to_owned()),
},
DispatchAssignment {
backlog_ref: None,
ccd_id: 99,
github_issue_number: 0,
session_id: "ses_KEEP".to_owned(),
worktree: "repo".to_owned(),
branch: Some("feature".to_owned()),
},
],
);
let dispatch_paths = ExtensionDispatchStatePaths::new(dispatch_path.clone());
remove_session_entries_for_paths(&dispatch_paths, "ses_OLD").unwrap();
let assignments = read_dispatch_assignments(&dispatch_path);
assert_eq!(assignments.len(), 1);
assert_eq!(assignments[0].session_id, "ses_KEEP");
}
}