use anyhow::{Context, Result};
use rustix::fs::{flock, FlockOperation};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::fs::{self, File, OpenOptions};
use std::io::{BufRead, BufReader, Seek, Write};
use std::os::fd::AsFd;
use std::path::{Path, PathBuf};
pub const VALID_STATUSES: &[&str] = &["pending", "in_progress", "closed"];
pub const VALID_DISPLAY_STATUSES: &[&str] = &["pending", "blocked", "in_progress", "closed"];
pub const VALID_SOURCE_TYPES: &[&str] = &["diff", "file", "text", "directory"];
pub const VALID_PRIORITY_RANGE: std::ops::RangeInclusive<u8> = 0..=4;
const ID_ALPHABET: [char; 32] = [
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'j',
'k', 'm', 'n', 'p', 'q', 'r', 's', 't', 'v', 'w', 'x', 'y', 'z',
];
const MIN_GENERATED_ID_LENGTH: usize = 3;
const ID_SPACE_OCCUPANCY_THRESHOLD_PERCENT: usize = 10;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Item {
pub id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub title: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub priority: Option<u8>,
pub sources: Vec<Source>,
pub metadata: serde_json::Value,
pub created_at: String,
pub updated_at: String,
#[serde(skip_serializing_if = "is_empty_vec")]
#[serde(default)]
pub blocked_by: Vec<String>,
#[serde(skip_serializing_if = "is_empty_json_vec")]
#[serde(default)]
pub errors: Vec<serde_json::Value>,
}
fn is_empty_vec(v: &[String]) -> bool {
v.is_empty()
}
fn is_empty_json_vec(v: &[serde_json::Value]) -> bool {
v.is_empty()
}
impl Item {
pub fn to_json_value(&self) -> serde_json::Value {
serde_json::to_value(self).expect("item serialization should succeed")
}
pub fn to_json_string(&self) -> String {
serde_json::to_string(self).expect("item serialization should succeed")
}
pub fn pending(&self) -> bool {
self.status == "pending"
}
pub fn blocked(&self) -> bool {
!self.blocked_by.is_empty()
}
pub fn computed_status(&self, open_ids: Option<&HashSet<String>>) -> String {
if !self.pending() || !self.blocked() {
return self.status.clone();
}
match open_ids {
None => "blocked".to_string(),
Some(ids) => {
if self.blocked_by.iter().any(|id| ids.contains(id)) {
"blocked".to_string()
} else {
self.status.clone()
}
}
}
}
pub fn with_computed_status(&self, open_ids: Option<&HashSet<String>>) -> Self {
let mut item = self.clone();
item.status = self.computed_status(open_ids);
item
}
pub fn ready(&self, open_ids: Option<&HashSet<String>>) -> bool {
if !self.pending() {
return false;
}
if !self.blocked() {
return true;
}
match open_ids {
None => true,
Some(ids) => self.blocked_by.iter().all(|id| !ids.contains(id)),
}
}
}
pub fn parse_priority_value(input: &str) -> Result<u8> {
let trimmed = input.trim();
let priority = trimmed
.parse::<u8>()
.with_context(|| format!("Invalid priority: {input}. Valid: 0-4"))?;
if !VALID_PRIORITY_RANGE.contains(&priority) {
anyhow::bail!("Invalid priority: {input}. Valid: 0-4");
}
Ok(priority)
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Source {
#[serde(rename = "type")]
pub type_: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub path: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub content: Option<String>,
}
impl Source {
pub fn to_json_value(&self) -> serde_json::Value {
serde_json::to_value(self).expect("source serialization should succeed")
}
}
pub struct Queue {
pub path: PathBuf,
}
pub struct NewItem {
pub sources: Vec<Source>,
pub title: Option<String>,
pub description: Option<String>,
pub priority: Option<u8>,
pub metadata: serde_json::Value,
pub blocked_by: Vec<String>,
}
impl Queue {
pub fn new(path: PathBuf) -> Self {
Self { path }
}
pub fn push(
&self,
sources: Vec<Source>,
title: Option<String>,
description: Option<String>,
priority: Option<u8>,
metadata: serde_json::Value,
blocked_by: Vec<String>,
) -> Result<Item> {
let new_item = NewItem {
sources,
title,
description,
priority,
metadata,
blocked_by,
};
let mut items = self.push_many_with_description(vec![new_item])?;
Ok(items.remove(0))
}
pub fn push_many_with_description(&self, items: Vec<NewItem>) -> Result<Vec<Item>> {
if items.is_empty() {
anyhow::bail!("At least one item is required");
}
for item in &items {
self.validate_new_item(item)?;
}
self.with_exclusive_lock(|f| {
let existing = read_items(f, &self.path);
let mut existing_ids: HashSet<String> = existing.iter().map(|i| i.id.clone()).collect();
let now = now_iso8601();
let mut created = Vec::with_capacity(items.len());
f.seek(std::io::SeekFrom::End(0))?;
for new_item in items {
let id = generate_id(&existing_ids);
existing_ids.insert(id.clone());
let item = Item {
id,
title: new_item.title,
description: new_item.description,
status: "pending".to_string(),
priority: new_item.priority,
sources: new_item.sources,
metadata: new_item.metadata,
created_at: now.clone(),
updated_at: now.clone(),
blocked_by: new_item.blocked_by,
errors: Vec::new(),
};
writeln!(f, "{}", item.to_json_string())?;
created.push(item);
}
f.flush()?;
Ok(created)
})
}
pub fn all(&self) -> Vec<Item> {
if !self.path.exists() {
return Vec::new();
}
match self.with_shared_lock(|f| Ok(read_items(f, &self.path))) {
Ok(items) => items,
Err(_) => Vec::new(),
}
}
pub fn find(&self, id: &str) -> Option<Item> {
self.all().into_iter().find(|item| item.id == id)
}
pub fn open_ids(&self) -> HashSet<String> {
open_ids_for_items(&self.all())
}
pub fn item_with_computed_status(&self, item: Item) -> Item {
let open_ids = self.open_ids();
item.with_computed_status(Some(&open_ids))
}
pub fn items_with_computed_status(&self, items: Vec<Item>) -> Vec<Item> {
let open_ids = self.open_ids();
items
.into_iter()
.map(|item| item.with_computed_status(Some(&open_ids)))
.collect()
}
pub fn all_with_computed_status(&self) -> Vec<Item> {
let items = self.all();
let open_ids = open_ids_for_items(&items);
items
.into_iter()
.map(|item| item.with_computed_status(Some(&open_ids)))
.collect()
}
pub fn find_with_computed_status(&self, id: &str) -> Option<Item> {
let items = self.all();
let open_ids = open_ids_for_items(&items);
items
.into_iter()
.find(|item| item.id == id)
.map(|item| item.with_computed_status(Some(&open_ids)))
}
pub fn filter(&self, status: Option<&str>) -> Vec<Item> {
let items = self.all();
match status {
Some(s) => items.into_iter().filter(|item| item.status == s).collect(),
None => items,
}
}
pub fn ready(&self) -> Vec<Item> {
let items = self.all();
let open_ids: HashSet<String> = items
.iter()
.filter(|i| i.status != "closed")
.map(|i| i.id.clone())
.collect();
items
.into_iter()
.filter(|item| item.ready(Some(&open_ids)))
.collect()
}
pub fn update(&self, id: &str, attrs: UpdateAttrs) -> Result<Option<Item>> {
self.with_exclusive_lock(|f| {
let mut items = read_items(f, &self.path);
let index = items.iter().position(|item| item.id == id);
let index = match index {
Some(i) => i,
None => return Ok(None),
};
let original = items[index].clone();
let item = &mut items[index];
if let Some(status) = &attrs.status {
if !VALID_STATUSES.contains(&status.as_str()) {
anyhow::bail!(
"Invalid status: {}. Valid: {}",
status,
VALID_STATUSES.join(", ")
);
}
item.status = status.clone();
}
if let Some(title) = attrs.title {
item.title = Some(title);
}
if let Some(description) = attrs.description {
item.description = Some(description);
}
if let Some(priority) = attrs.priority {
if let Some(value) = priority {
validate_priority(value)?;
}
item.priority = priority;
}
if let Some(metadata) = attrs.metadata {
item.metadata = metadata;
}
if let Some(blocked_by) = attrs.blocked_by {
validate_blocked_by(id, &blocked_by)?;
item.blocked_by = blocked_by;
}
if let Some(sources) = attrs.sources {
self.validate_source_types(&sources)?;
item.sources = sources;
}
if *item == original {
return Ok(Some(original));
}
item.updated_at = now_iso8601();
let updated = item.clone();
rewrite_items(f, &items)?;
Ok(Some(updated))
})
}
pub fn remove(&self, id: &str) -> Result<Option<Item>> {
self.with_exclusive_lock(|f| {
let mut items = read_items(f, &self.path);
let index = items.iter().position(|item| item.id == id);
match index {
Some(i) => {
let removed = items.remove(i);
rewrite_items(f, &items)?;
Ok(Some(removed))
}
None => Ok(None),
}
})
}
fn validate_new_item(&self, item: &NewItem) -> Result<()> {
if let Some(priority) = item.priority {
validate_priority(priority)?;
}
if item.sources.is_empty() && item.title.is_none() && item.description.is_none() {
anyhow::bail!("Item requires at least one source, title, or description");
}
self.validate_source_types(&item.sources)
}
fn validate_source_types(&self, sources: &[Source]) -> Result<()> {
for source in sources {
if !VALID_SOURCE_TYPES.contains(&source.type_.as_str()) {
anyhow::bail!(
"Invalid source type: {}. Valid: {}",
source.type_,
VALID_SOURCE_TYPES.join(", ")
);
}
}
Ok(())
}
fn ensure_directory(&self) -> Result<()> {
if let Some(parent) = self.path.parent() {
if !parent.exists() {
fs::create_dir_all(parent)?;
}
}
Ok(())
}
fn with_exclusive_lock<T, F>(&self, f: F) -> Result<T>
where
F: FnOnce(&mut File) -> Result<T>,
{
self.ensure_directory()?;
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&self.path)
.with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
flock(&file.as_fd(), FlockOperation::LockExclusive)
.with_context(|| "Failed to acquire exclusive lock")?;
let result = f(&mut file);
result
}
fn with_shared_lock<T, F>(&self, f: F) -> Result<T>
where
F: FnOnce(&mut File) -> Result<T>,
{
let mut file = File::open(&self.path)
.with_context(|| format!("Failed to open queue file: {}", self.path.display()))?;
flock(&file.as_fd(), FlockOperation::LockShared)
.with_context(|| "Failed to acquire shared lock")?;
f(&mut file)
}
}
fn open_ids_for_items(items: &[Item]) -> HashSet<String> {
items
.iter()
.filter(|item| item.status != "closed")
.map(|item| item.id.clone())
.collect()
}
fn read_items(file: &mut File, path: &Path) -> Vec<Item> {
file.seek(std::io::SeekFrom::Start(0)).ok();
let reader = BufReader::new(file);
let mut items = Vec::new();
for (line_num, line) in reader.lines().enumerate() {
let line = match line {
Ok(l) => l,
Err(_) => continue,
};
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
match serde_json::from_str::<Item>(trimmed) {
Ok(item) => items.push(item),
Err(e) => {
eprintln!(
"Warning: Skipping corrupt line {} in {}: {}",
line_num + 1,
path.display(),
e
);
}
}
}
items
}
fn rewrite_items(file: &mut File, items: &[Item]) -> Result<()> {
file.seek(std::io::SeekFrom::Start(0))?;
file.set_len(0)?;
for item in items {
writeln!(file, "{}", item.to_json_string())?;
}
file.flush()?;
Ok(())
}
fn generate_id(existing_ids: &HashSet<String>) -> String {
let length = generated_id_length(existing_ids.len());
loop {
let id = nanoid::nanoid!(length, &ID_ALPHABET);
if !existing_ids.contains(&id) {
return id;
}
}
}
fn generated_id_length(active_or_reserved_ids: usize) -> usize {
let mut length = MIN_GENERATED_ID_LENGTH;
loop {
let total_space = id_space_for_length(length);
let occupancy_threshold = total_space * ID_SPACE_OCCUPANCY_THRESHOLD_PERCENT / 100;
if active_or_reserved_ids < occupancy_threshold {
return length;
}
length += 1;
}
}
fn id_space_for_length(length: usize) -> usize {
ID_ALPHABET
.len()
.checked_pow(length as u32)
.expect("generated ID space should fit in usize")
}
fn now_iso8601() -> String {
chrono::Utc::now()
.format("%Y-%m-%dT%H:%M:%S%.3fZ")
.to_string()
}
fn validate_priority(priority: u8) -> Result<()> {
if VALID_PRIORITY_RANGE.contains(&priority) {
Ok(())
} else {
anyhow::bail!("Invalid priority: {}. Valid: 0-4", priority);
}
}
fn validate_blocked_by(item_id: &str, blocked_by: &[String]) -> Result<()> {
if blocked_by.iter().any(|blocker_id| blocker_id == item_id) {
anyhow::bail!("Item cannot block itself: {}", item_id);
}
Ok(())
}
#[derive(Default)]
pub struct UpdateAttrs {
pub status: Option<String>,
pub title: Option<String>,
pub description: Option<String>,
pub priority: Option<Option<u8>>,
pub metadata: Option<serde_json::Value>,
pub blocked_by: Option<Vec<String>>,
pub sources: Option<Vec<Source>>,
}
#[cfg(test)]
mod tests {
use super::{generated_id_length, id_space_for_length};
#[test]
fn id_space_matches_base32_lengths() {
assert_eq!(id_space_for_length(3), 32_768);
assert_eq!(id_space_for_length(4), 1_048_576);
assert_eq!(id_space_for_length(5), 33_554_432);
assert_eq!(id_space_for_length(6), 1_073_741_824);
}
#[test]
fn generated_ids_start_at_three_characters() {
assert_eq!(generated_id_length(0), 3);
assert_eq!(generated_id_length(3_275), 3);
}
#[test]
fn generated_id_length_grows_at_documented_thresholds() {
assert_eq!(generated_id_length(3_276), 4);
assert_eq!(generated_id_length(104_857), 5);
assert_eq!(generated_id_length(3_355_443), 6);
assert_eq!(generated_id_length(107_374_182), 7);
}
}