use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
use anyhow::{Context, Result};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::Semaphore;
use tracing::{debug, error, info, warn};
use super::config::DaemonConfig;
use super::protocol::{
CreateArgs, DeleteArgs, DepArgs, HealthInfo, LabelArgs, ListArgs,
Operation, Request, Response, ShowArgs, UpdateArgs, CommentArgs,
PROTOCOL_VERSION,
};
use super::{get_pid_path, get_socket_path};
use crate::idgen::generate_id;
use crate::storage::{SqliteStorage, Storage};
use crate::types::{Dependency, DependencyType, Issue, IssueFilter, IssueType, Status};
pub struct Daemon {
storage: Arc<SqliteStorage>,
beads_dir: PathBuf,
socket_path: PathBuf,
config: DaemonConfig,
shutdown: AtomicBool,
start_time: Instant,
connection_count: AtomicU64,
connection_semaphore: Arc<Semaphore>,
}
impl Daemon {
pub fn new(
storage: SqliteStorage,
beads_dir: PathBuf,
config: DaemonConfig,
) -> Self {
let socket_path = get_socket_path(&beads_dir);
let max_conns = config.max_connections;
Self {
storage: Arc::new(storage),
beads_dir,
socket_path,
config,
shutdown: AtomicBool::new(false),
start_time: Instant::now(),
connection_count: AtomicU64::new(0),
connection_semaphore: Arc::new(Semaphore::new(max_conns)),
}
}
pub async fn start(&self) -> Result<()> {
if self.socket_path.exists() {
std::fs::remove_file(&self.socket_path)
.context("Failed to remove stale socket")?;
}
if let Some(parent) = self.socket_path.parent() {
std::fs::create_dir_all(parent)
.context("Failed to create socket directory")?;
}
let pid_path = get_pid_path(&self.beads_dir);
std::fs::write(&pid_path, std::process::id().to_string())
.context("Failed to write PID file")?;
let listener = UnixListener::bind(&self.socket_path)
.context("Failed to bind to socket")?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(&self.socket_path, std::fs::Permissions::from_mode(0o600))
.context("Failed to set socket permissions")?;
}
info!("Daemon started on {:?}", self.socket_path);
while !self.shutdown.load(Ordering::Acquire) {
tokio::select! {
result = listener.accept() => {
match result {
Ok((stream, _)) => {
let permit = match self.connection_semaphore.clone().try_acquire_owned() {
Ok(permit) => permit,
Err(_) => {
warn!("Connection limit reached, rejecting");
continue;
}
};
self.connection_count.fetch_add(1, Ordering::Relaxed);
let storage = self.storage.clone();
let beads_dir = self.beads_dir.clone();
let start_time = self.start_time;
tokio::spawn(async move {
if let Err(e) = handle_connection(stream, storage, beads_dir, start_time).await {
debug!("Connection error: {}", e);
}
drop(permit);
});
}
Err(e) => {
error!("Accept error: {}", e);
}
}
}
_ = tokio::signal::ctrl_c() => {
info!("Received shutdown signal");
self.shutdown();
break;
}
}
}
self.cleanup()?;
Ok(())
}
pub fn shutdown(&self) {
self.shutdown.store(true, Ordering::Release);
}
fn cleanup(&self) -> Result<()> {
if self.socket_path.exists() {
std::fs::remove_file(&self.socket_path)?;
}
let pid_path = get_pid_path(&self.beads_dir);
if pid_path.exists() {
std::fs::remove_file(&pid_path)?;
}
self.storage.close()?;
info!("Daemon shut down cleanly");
Ok(())
}
}
async fn handle_connection(
stream: UnixStream,
storage: Arc<SqliteStorage>,
beads_dir: PathBuf,
start_time: Instant,
) -> Result<()> {
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
let mut line = String::new();
while reader.read_line(&mut line).await? > 0 {
let request: Request = match serde_json::from_str(&line) {
Ok(req) => req,
Err(e) => {
let response = Response::error(format!("Invalid request: {}", e));
send_response(&mut writer, &response).await?;
line.clear();
continue;
}
};
debug!("Received request: {:?}", request.operation);
let response = handle_request(&request, &storage, &beads_dir, start_time).await
.with_request_id(&request.request_id);
send_response(&mut writer, &response).await?;
line.clear();
if request.operation == Operation::Shutdown {
break;
}
}
Ok(())
}
async fn send_response(
writer: &mut tokio::net::unix::OwnedWriteHalf,
response: &Response,
) -> Result<()> {
let json = serde_json::to_string(response)?;
writer.write_all(json.as_bytes()).await?;
writer.write_all(b"\n").await?;
writer.flush().await?;
Ok(())
}
async fn handle_request(
request: &Request,
storage: &SqliteStorage,
beads_dir: &PathBuf,
start_time: Instant,
) -> Response {
match request.operation {
Operation::Ping => Response::ok(),
Operation::Health => {
let info = HealthInfo {
pid: std::process::id(),
uptime_secs: start_time.elapsed().as_secs(),
version: crate::VERSION.to_string(),
protocol_version: PROTOCOL_VERSION.to_string(),
compatible: true,
database_path: beads_dir.join("beads.db").to_string_lossy().to_string(),
};
Response::success(info)
}
Operation::Status => {
match storage.get_statistics() {
Ok(stats) => Response::success(stats),
Err(e) => Response::error(e.to_string()),
}
}
Operation::Create => {
let args: CreateArgs = match serde_json::from_value(request.args.clone()) {
Ok(args) => args,
Err(e) => return Response::error(format!("Invalid arguments: {}", e)),
};
let id = if let Some(ref parent_id) = args.parent_id {
match storage.next_child_counter(parent_id) {
Ok(counter) => crate::idgen::generate_child_id(parent_id, counter),
Err(e) => return Response::error(e.to_string()),
}
} else {
generate_id("bd")
};
let issue_type = args.issue_type
.as_ref()
.and_then(|s| s.parse().ok())
.unwrap_or(IssueType::Task);
let mut issue = Issue::new(&id, &args.title, &request.actor)
.with_type(issue_type);
if let Some(desc) = args.description {
issue.description = Some(desc);
}
if let Some(priority) = args.priority {
issue.priority = priority.clamp(0, 4);
}
if let Some(assignee) = args.assignee {
issue.assignee = Some(assignee);
}
issue.update_content_hash();
match storage.create_issue(&issue) {
Ok(()) => {
for label in &args.labels {
let _ = storage.add_label(&id, label);
}
if let Some(ref parent_id) = args.parent_id {
let dep = Dependency::parent_child(&id, parent_id)
.with_creator(&request.actor);
let _ = storage.add_dependency(&dep);
}
Response::success(&issue)
}
Err(e) => Response::error(e.to_string()),
}
}
Operation::Show => {
let args: ShowArgs = match serde_json::from_value(request.args.clone()) {
Ok(args) => args,
Err(e) => return Response::error(format!("Invalid arguments: {}", e)),
};
match storage.get_issue(&args.id) {
Ok(Some(issue)) => Response::success(&issue),
Ok(None) => Response::error(format!("Issue not found: {}", args.id)),
Err(e) => Response::error(e.to_string()),
}
}
Operation::Update => {
let args: UpdateArgs = match serde_json::from_value(request.args.clone()) {
Ok(args) => args,
Err(e) => return Response::error(format!("Invalid arguments: {}", e)),
};
match storage.get_issue(&args.id) {
Ok(Some(mut issue)) => {
if let Some(title) = args.title {
issue.title = title;
}
if let Some(description) = args.description {
issue.description = Some(description);
}
if let Some(status) = args.status {
if let Ok(s) = status.parse::<Status>() {
issue.status = s;
}
}
if let Some(priority) = args.priority {
issue.priority = priority.clamp(0, 4);
}
if let Some(assignee) = args.assignee {
issue.assignee = Some(assignee);
}
issue.touch();
issue.update_content_hash();
match storage.update_issue(&issue) {
Ok(()) => Response::success(&issue),
Err(e) => Response::error(e.to_string()),
}
}
Ok(None) => Response::error(format!("Issue not found: {}", args.id)),
Err(e) => Response::error(e.to_string()),
}
}
Operation::Delete => {
let args: DeleteArgs = match serde_json::from_value(request.args.clone()) {
Ok(args) => args,
Err(e) => return Response::error(format!("Invalid arguments: {}", e)),
};
match storage.delete_issue(&args.id, &request.actor, args.reason.as_deref()) {
Ok(()) => Response::ok(),
Err(e) => Response::error(e.to_string()),
}
}
Operation::List => {
let args: ListArgs = serde_json::from_value(request.args.clone())
.unwrap_or_default();
let mut filter = IssueFilter::new();
if let Some(ref status) = args.status {
if let Ok(s) = status.parse::<Status>() {
filter.status = Some(s);
}
}
if let Some(ref issue_type) = args.issue_type {
if let Ok(t) = issue_type.parse::<IssueType>() {
filter.issue_type = Some(t);
}
}
if let Some(ref assignee) = args.assignee {
filter.assignee = Some(assignee.clone());
}
if let Some(ref label) = args.label {
filter.label = Some(label.clone());
}
if let Some(limit) = args.limit {
filter.limit = Some(limit);
}
if !args.include_closed {
filter.statuses = Some(vec![Status::Open, Status::InProgress, Status::Blocked]);
}
match storage.search_issues(&filter) {
Ok(issues) => Response::success(&issues),
Err(e) => Response::error(e.to_string()),
}
}
Operation::Count => {
match storage.get_statistics() {
Ok(stats) => Response::success(stats.total_issues),
Err(e) => Response::error(e.to_string()),
}
}
Operation::Ready => {
match storage.get_ready_work() {
Ok(issues) => Response::success(&issues),
Err(e) => Response::error(e.to_string()),
}
}
Operation::Blocked => {
match storage.get_blocked_issues() {
Ok(blocked) => Response::success(&blocked),
Err(e) => Response::error(e.to_string()),
}
}
Operation::Stale => {
let filter = IssueFilter::active();
match storage.search_issues(&filter) {
Ok(issues) => {
let now = chrono::Utc::now();
let stale: Vec<_> = issues.into_iter()
.filter(|i| (now - i.updated_at).num_days() > 7)
.collect();
Response::success(&stale)
}
Err(e) => Response::error(e.to_string()),
}
}
Operation::DepAdd => {
let args: DepArgs = match serde_json::from_value(request.args.clone()) {
Ok(args) => args,
Err(e) => return Response::error(format!("Invalid arguments: {}", e)),
};
let dep_type = args.dep_type
.as_ref()
.and_then(|s| s.parse().ok())
.unwrap_or(DependencyType::Blocks);
let dep = Dependency {
issue_id: args.issue_id,
depends_on_id: args.depends_on_id,
dep_type,
created_at: chrono::Utc::now(),
created_by: Some(request.actor.clone()),
metadata: None,
thread_id: None,
};
match storage.add_dependency(&dep) {
Ok(()) => Response::ok(),
Err(e) => Response::error(e.to_string()),
}
}
Operation::DepRemove => {
let args: DepArgs = match serde_json::from_value(request.args.clone()) {
Ok(args) => args,
Err(e) => return Response::error(format!("Invalid arguments: {}", e)),
};
match storage.remove_dependency(&args.issue_id, &args.depends_on_id) {
Ok(()) => Response::ok(),
Err(e) => Response::error(e.to_string()),
}
}
Operation::DepList => {
let args: ShowArgs = match serde_json::from_value(request.args.clone()) {
Ok(args) => args,
Err(e) => return Response::error(format!("Invalid arguments: {}", e)),
};
match storage.get_dependencies(&args.id) {
Ok(deps) => Response::success(&deps),
Err(e) => Response::error(e.to_string()),
}
}
Operation::LabelAdd => {
let args: LabelArgs = match serde_json::from_value(request.args.clone()) {
Ok(args) => args,
Err(e) => return Response::error(format!("Invalid arguments: {}", e)),
};
match storage.add_label(&args.issue_id, &args.label) {
Ok(()) => Response::ok(),
Err(e) => Response::error(e.to_string()),
}
}
Operation::LabelRemove => {
let args: LabelArgs = match serde_json::from_value(request.args.clone()) {
Ok(args) => args,
Err(e) => return Response::error(format!("Invalid arguments: {}", e)),
};
match storage.remove_label(&args.issue_id, &args.label) {
Ok(()) => Response::ok(),
Err(e) => Response::error(e.to_string()),
}
}
Operation::LabelList => {
let args: ShowArgs = match serde_json::from_value(request.args.clone()) {
Ok(args) => args,
Err(e) => return Response::error(format!("Invalid arguments: {}", e)),
};
match storage.get_labels(&args.id) {
Ok(labels) => Response::success(&labels),
Err(e) => Response::error(e.to_string()),
}
}
Operation::CommentAdd => {
let args: CommentArgs = match serde_json::from_value(request.args.clone()) {
Ok(args) => args,
Err(e) => return Response::error(format!("Invalid arguments: {}", e)),
};
match storage.add_comment(&args.issue_id, &request.actor, &args.text) {
Ok(id) => Response::success(id),
Err(e) => Response::error(e.to_string()),
}
}
Operation::CommentList => {
let args: ShowArgs = match serde_json::from_value(request.args.clone()) {
Ok(args) => args,
Err(e) => return Response::error(format!("Invalid arguments: {}", e)),
};
match storage.get_comments(&args.id) {
Ok(comments) => Response::success(&comments),
Err(e) => Response::error(e.to_string()),
}
}
Operation::Stats => {
match storage.get_statistics() {
Ok(stats) => Response::success(&stats),
Err(e) => Response::error(e.to_string()),
}
}
Operation::Compact | Operation::CompactStats => {
Response::error("Compaction not yet implemented")
}
Operation::Export | Operation::Import => {
Response::error("Import/Export not yet implemented via daemon")
}
Operation::Config => {
match storage.get_all_config() {
Ok(config) => Response::success(&config),
Err(e) => Response::error(e.to_string()),
}
}
Operation::Shutdown => {
Response::ok()
}
}
}