use std::sync::Arc;
use darq_core::api::Api;
use darq_core::streaming::EventBroadcaster;
use darq_core::workflow::chain::WorkflowEngine;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixListener;
use super::lifecycle;
use super::protocol::*;
static DAEMON_START: std::sync::OnceLock<std::time::Instant> = std::sync::OnceLock::new();
pub struct SocketServer {
api: Arc<Api>,
engine: Arc<WorkflowEngine>,
repo: Option<String>,
shutdown_tx: tokio::sync::broadcast::Sender<()>,
}
impl SocketServer {
pub fn new(
api: Arc<Api>,
engine: Arc<WorkflowEngine>,
) -> (Self, tokio::sync::broadcast::Receiver<()>) {
let _ = DAEMON_START.set(std::time::Instant::now());
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel(1);
(
Self {
api,
engine,
repo: None,
shutdown_tx,
},
shutdown_rx,
)
}
pub fn with_repo(mut self, repo: String) -> Self {
self.repo = Some(repo);
self
}
pub async fn run(self) -> anyhow::Result<()> {
let sock_path = lifecycle::socket_path();
if sock_path.exists() {
std::fs::remove_file(&sock_path)?;
}
lifecycle::ensure_home_dir()?;
let listener = UnixListener::bind(&sock_path)?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
if let Ok(meta) = std::fs::metadata(&sock_path) {
let mut perms = meta.permissions();
perms.set_mode(0o600);
let _ = std::fs::set_permissions(&sock_path, perms);
}
}
tracing::info!(socket = %sock_path.display(), "daemon: listening");
lifecycle::write_pid_file()?;
tracing::info!(pid = std::process::id(), "daemon: started");
let mut shutdown_rx = self.shutdown_tx.subscribe();
loop {
tokio::select! {
result = listener.accept() => {
match result {
Ok((stream, _)) => {
let api = self.api.clone();
let engine = self.engine.clone();
let repo = self.repo.clone();
let broadcaster = self.api.broadcaster().clone();
let shutdown_tx = self.shutdown_tx.clone();
tokio::spawn(async move {
if let Err(e) = handle_connection(stream, &api, engine.clone(), repo, broadcaster, shutdown_tx).await {
tracing::warn!(error = %e, "daemon: connection error");
}
});
}
Err(e) => {
tracing::warn!(error = %e, "daemon: accept error");
}
}
}
_ = shutdown_rx.recv() => {
tracing::info!("daemon: shutting down");
break;
}
}
}
lifecycle::cleanup();
tracing::info!("daemon: stopped");
Ok(())
}
#[allow(dead_code)]
pub fn shutdown(&self) {
let _ = self.shutdown_tx.send(());
}
}
async fn handle_connection(
stream: tokio::net::UnixStream,
api: &Api,
engine: Arc<WorkflowEngine>,
repo: Option<String>,
broadcaster: EventBroadcaster,
shutdown_tx: tokio::sync::broadcast::Sender<()>,
) -> anyhow::Result<()> {
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
let mut line = String::new();
loop {
line.clear();
tokio::select! {
result = reader.read_line(&mut line) => {
match result {
Ok(0) => break, Ok(_) => {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(req) = serde_json::from_str::<Request>(trimmed)
&& req.method == Method::Subscribe {
let ack = Response::success(req.id, serde_json::json!({"subscribed": true}));
let json = serde_json::to_string(&ack)?;
let _ = writer.write_all(json.as_bytes()).await;
let _ = writer.write_all(b"\n").await;
handle_event_stream(&mut reader, &mut writer, broadcaster).await;
return Ok(());
}
let response = dispatch(trimmed, api, &engine, repo.as_deref()).await;
let is_shutdown = matches!(&response, Response::Success { result, .. }
if result.get("shutdown").and_then(|v| v.as_bool()) == Some(true));
let json = serde_json::to_string(&response)?;
if writer.write_all(json.as_bytes()).await.is_err() {
break;
}
if writer.write_all(b"\n").await.is_err() {
break;
}
if is_shutdown {
let _ = shutdown_tx.send(());
return Ok(());
}
}
Err(e) => {
tracing::warn!(error = %e, "daemon: read error");
break;
}
}
}
}
}
Ok(())
}
async fn handle_event_stream(
reader: &mut BufReader<tokio::net::unix::OwnedReadHalf>,
writer: &mut tokio::net::unix::OwnedWriteHalf,
broadcaster: EventBroadcaster,
) {
let mut rx = broadcaster.subscribe_external().await;
let mut line = String::new();
loop {
tokio::select! {
event = rx.recv() => {
match event {
Some(run_event) => {
let wrapper = serde_json::json!({
"event": "run_event",
"data": run_event,
});
let json = match serde_json::to_string(&wrapper) {
Ok(j) => j,
Err(_) => continue,
};
if writer.write_all(json.as_bytes()).await.is_err() {
break;
}
if writer.write_all(b"\n").await.is_err() {
break;
}
}
None => break, }
}
result = reader.read_line(&mut line) => {
match result {
Ok(0) => break, Ok(_) => {
let trimmed = line.trim();
if trimmed == "unsubscribe" || trimmed.contains("\"method\":\"unsubscribe\"") {
break;
}
line.clear();
}
Err(_) => break,
}
}
}
}
tracing::info!("daemon: event stream ended");
}
async fn dispatch(
raw: &str,
api: &Api,
engine: &Arc<WorkflowEngine>,
repo: Option<&str>,
) -> Response {
let request: Request = match serde_json::from_str(raw) {
Ok(r) => r,
Err(e) => {
return Response::error("unknown".into(), format!("invalid request: {e}"));
}
};
let id = request.id.clone();
match request.method {
Method::Status => handle_status(id, api).await,
Method::RunList => handle_run_list(id, api, &request.params).await,
Method::RunShow => handle_run_show(id, api, &request.params).await,
Method::RunApprove => handle_run_approve(id, api, &request.params).await,
Method::RunCancel => handle_run_cancel(id, api, &request.params).await,
Method::Shutdown => handle_shutdown(id).await,
Method::WorkflowStart => handle_workflow_start(id, engine.clone(), request.params).await,
Method::WorkflowChain => handle_workflow_chain(id, engine.clone(), request.params).await,
Method::Sweep => handle_sweep(id, engine, repo, &request.params).await,
Method::Stats => handle_stats(id, api).await,
Method::Subscribe => {
Response::error(id, "subscribe must be the first command on a connection")
}
}
}
async fn handle_stats(id: String, api: &Api) -> Response {
match api.list_runs(None, None, Some(1000)).await {
Ok(runs) => {
let total = runs.len();
let completed = runs
.iter()
.filter(|r| matches!(r.status, darq_core::types::RunStatus::Completed))
.count();
let failed = runs
.iter()
.filter(|r| matches!(r.status, darq_core::types::RunStatus::Failed))
.count();
let first_pass_rate = if completed > 0 {
runs.iter()
.filter(|r| matches!(r.status, darq_core::types::RunStatus::Completed))
.filter(|r| r.patterns_effectiveness.iter().all(|p| p.avoided))
.count() as f64
/ completed as f64
} else {
0.0
};
let avg_sat = runs
.iter()
.filter_map(|r| r.sat_score)
.fold((0.0, 0usize), |(sum, count), score| {
(sum + score, count + 1)
});
let avg_sat_score = if avg_sat.1 > 0 {
avg_sat.0 / avg_sat.1 as f64
} else {
0.0
};
let mut pattern_stats: std::collections::HashMap<String, (usize, usize)> =
std::collections::HashMap::new();
for run in &runs {
for pe in &run.patterns_effectiveness {
let entry = pattern_stats.entry(pe.pattern_id.clone()).or_insert((0, 0));
if pe.avoided {
entry.0 += 1;
}
entry.1 += 1;
}
}
let patterns: Vec<serde_json::Value> = pattern_stats
.iter()
.map(|(pid, (ok, total))| {
serde_json::json!({
"pattern_id": pid,
"successes": ok,
"total": total,
"rate": if *total > 0 { *ok as f64 / *total as f64 } else { 0.0 }
})
})
.collect();
let uptime_secs = DAEMON_START
.get()
.map(|t| t.elapsed().as_secs())
.unwrap_or(0);
let blueprint_count: u64 = 0;
Response::success(
id,
serde_json::json!({
"total_runs": total,
"completed": completed,
"failed": failed,
"first_pass_rate": first_pass_rate,
"avg_sat_score": avg_sat_score,
"pattern_effectiveness": patterns,
"uptime_secs": uptime_secs,
"blueprint_count": blueprint_count,
}),
)
}
Err(e) => Response::error(id, format!("failed to query runs: {e}")),
}
}
async fn handle_status(id: String, api: &Api) -> Response {
match api.list_runs(None, None, None).await {
Ok(runs) => {
let mut counts: std::collections::HashMap<String, usize> =
std::collections::HashMap::new();
for run in &runs {
*counts.entry(run.status.to_string()).or_default() += 1;
}
Response::success(
id,
serde_json::json!({
"counts": counts,
"total": runs.len(),
}),
)
}
Err(e) => Response::error(id, format!("failed to list runs: {e}")),
}
}
async fn handle_run_list(id: String, api: &Api, params: &serde_json::Value) -> Response {
let status = params.get("status").and_then(|v| v.as_str());
let milestone = params.get("milestone").and_then(|v| v.as_str());
match api.list_runs(status, milestone, None).await {
Ok(runs) => Response::success(id, serde_json::json!({ "runs": runs })),
Err(e) => Response::error(id, format!("failed to list runs: {e}")),
}
}
async fn handle_run_show(id: String, api: &Api, params: &serde_json::Value) -> Response {
let run_id = match params.get("id").and_then(|v| v.as_str()) {
Some(id) => id,
None => return Response::error(id, "missing 'id' parameter"),
};
match api.get_run(run_id).await {
Ok(Some(run)) => Response::success(id, serde_json::json!({ "run": run })),
Ok(None) => Response::error(id, format!("run not found: {run_id}")),
Err(e) => Response::error(id, format!("failed to get run: {e}")),
}
}
async fn handle_run_approve(id: String, api: &Api, params: &serde_json::Value) -> Response {
let run_id = match params.get("id").and_then(|v| v.as_str()) {
Some(id) => id,
None => return Response::error(id, "missing 'id' parameter"),
};
match api
.approve_run(run_id, Some("daemon-client".into()), None)
.await
{
Ok(run) => Response::success(id, serde_json::json!({ "run": run })),
Err(e) => Response::error(id, format!("failed to approve run: {e}")),
}
}
async fn handle_run_cancel(id: String, api: &Api, params: &serde_json::Value) -> Response {
let run_id = match params.get("id").and_then(|v| v.as_str()) {
Some(id) => id,
None => return Response::error(id, "missing 'id' parameter"),
};
let reason = params
.get("reason")
.and_then(|v| v.as_str())
.map(String::from);
match api.cancel_run(run_id, reason).await {
Ok(run) => Response::success(id, serde_json::json!({ "run": run })),
Err(e) => Response::error(id, format!("failed to cancel run: {e}")),
}
}
async fn handle_shutdown(id: String) -> Response {
Response::success(id, serde_json::json!({ "shutdown": true }))
}
async fn handle_workflow_start(
id: String,
engine: Arc<WorkflowEngine>,
params: serde_json::Value,
) -> Response {
use darq_core::workflow::chain::ChainConfig;
let kind_str = match params.get("kind").and_then(|v| v.as_str()) {
Some(k) => k,
None => return Response::error(id, "missing 'kind' parameter"),
};
let issue = match params.get("issue").and_then(|v| v.as_u64()) {
Some(i) => i,
None => return Response::error(id, "missing 'issue' parameter"),
};
let kind = match parse_workflow_kind(kind_str) {
Some(k) => k,
None => return Response::error(id, format!("unknown workflow kind: {kind_str}")),
};
let chain = ChainConfig {
start_workflow: kind,
max_steps: 1,
pause_for_approval: false,
approval_gates: vec![],
};
tokio::spawn(async move {
match engine.execute_chain(issue, chain).await {
Ok(result) => {
tracing::info!(
run_id = %result.run_id,
status = ?result.status,
"workflow_start completed"
);
}
Err(e) => {
tracing::error!(error = %e, "workflow_start failed");
}
}
});
Response::success(id, serde_json::json!({ "queued": true }))
}
async fn handle_workflow_chain(
id: String,
engine: Arc<WorkflowEngine>,
params: serde_json::Value,
) -> Response {
use darq_core::workflow::chain::ChainConfig;
let kind_str = match params.get("start").and_then(|v| v.as_str()) {
Some(k) => k,
None => return Response::error(id, "missing 'start' parameter"),
};
let issue = match params.get("issue").and_then(|v| v.as_u64()) {
Some(i) => i,
None => return Response::error(id, "missing 'issue' parameter"),
};
let max_steps = params
.get("max_steps")
.and_then(|v| v.as_u64())
.unwrap_or(7) as u32;
let pause = params
.get("pause_for_approval")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let kind = match parse_workflow_kind(kind_str) {
Some(k) => k,
None => return Response::error(id, format!("unknown workflow kind: {kind_str}")),
};
let config = darq_core::config::load_or_default();
let approval_gates: Vec<darq_core::workflow::WorkflowKind> = config
.pipeline
.approval_gates
.iter()
.filter_map(|s| parse_workflow_kind(s))
.collect();
let chain = ChainConfig {
start_workflow: kind,
max_steps,
pause_for_approval: pause,
approval_gates,
};
let api_for_error = engine.api().clone();
tokio::spawn(async move {
match engine.execute_chain(issue, chain).await {
Ok(result) => {
tracing::info!(
run_id = %result.run_id,
status = ?result.status,
steps = result.steps.len(),
"workflow_chain completed"
);
if result.status == darq_core::workflow::chain::ChainStatus::Completed {
let _ = api_for_error.complete_run(&result.run_id).await;
} else if result.status == darq_core::workflow::chain::ChainStatus::Failed
&& let Some(ref error) = result.error
{
let _ = api_for_error.fail_run(&result.run_id, error.clone()).await;
}
}
Err(e) => {
tracing::error!(error = %e, "workflow_chain failed");
}
}
});
Response::success(id, serde_json::json!({ "queued": true }))
}
fn parse_workflow_kind(s: &str) -> Option<darq_core::workflow::WorkflowKind> {
use darq_core::workflow::WorkflowKind;
match s {
"plan_issue" | "PlanIssue" => Some(WorkflowKind::PlanIssue),
"implement_issue" | "ImplementIssue" => Some(WorkflowKind::ImplementIssue),
"review_pr" | "ReviewPr" => Some(WorkflowKind::ReviewPr),
"fix_review" | "FixReview" => Some(WorkflowKind::FixReview),
"merge_pr" | "MergePr" => Some(WorkflowKind::MergePr),
"sat_verify" | "SatVerify" => Some(WorkflowKind::SatVerify),
"learn_update" | "LearnUpdate" => Some(WorkflowKind::LearnUpdate),
_ => None,
}
}
async fn handle_sweep(
id: String,
engine: &WorkflowEngine,
repo: Option<&str>,
params: &serde_json::Value,
) -> Response {
use darq_core::sweep::execute_sweep;
use darq_core::workflow::github_service::{GitHubClient, RepoRef};
let milestone_name = match params.get("milestone").and_then(|v| v.as_str()) {
Some(n) => n,
None => return Response::error(id, "missing 'milestone' parameter"),
};
let dry_run = params
.get("dry_run")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let repo_str = match repo {
Some(r) => r,
None => return Response::error(id, "no repo configured in darq.yaml — set project.repo"),
};
let repo_ref = match RepoRef::parse(repo_str) {
Ok(r) => r,
Err(e) => return Response::error(id, format!("invalid repo config: {e}")),
};
let gh = GitHubClient::new(repo_ref);
match execute_sweep(&gh, engine, milestone_name, dry_run).await {
Ok(result) => Response::success(
id,
serde_json::json!({
"milestone": result.milestone,
"milestone_number": result.milestone_number,
"dry_run": result.dry_run,
"total": result.total_issues,
"completed": result.completed,
"skipped": result.skipped,
"errored": result.errored,
"duration_ms": result.total_duration_ms,
"issues": result.issues.iter().map(|i| serde_json::json!({
"number": i.issue_number,
"title": i.issue_title,
"action": i.action.to_string(),
"executed": i.executed,
"error": i.error,
})).collect::<Vec<_>>(),
}),
),
Err(e) => Response::error(id, format!("sweep failed: {e}")),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::daemon::protocol::{Method, Request};
use darq_core::api::Api;
use darq_core::config::Config;
use darq_core::workflow::WorkflowRegistry;
use darq_core::workflow::chain::WorkflowEngine;
use std::sync::Arc;
async fn make_test_engine() -> Arc<WorkflowEngine> {
let api = Arc::new(Api::in_memory().expect("in-memory API"));
let config = Config::default();
let tmp = tempfile::tempdir().expect("temp dir");
let registry = WorkflowRegistry::empty();
Arc::new(WorkflowEngine::with_registry(
api,
registry,
config,
tmp.path().to_path_buf(),
))
}
#[tokio::test]
async fn test_dispatch_status() {
let engine = make_test_engine().await;
let api = engine.api().clone();
let req = Request {
id: "t1".into(),
method: Method::Status,
params: serde_json::json!({}),
};
let json = serde_json::to_string(&req).unwrap();
let resp = dispatch(&json, &api, &engine, None).await;
match resp {
Response::Success { id, result } => {
assert_eq!(id, "t1");
assert!(result.get("total").is_some());
}
_ => panic!("expected success"),
}
}
#[tokio::test]
async fn test_dispatch_run_list_empty() {
let engine = make_test_engine().await;
let api = engine.api().clone();
let req = Request {
id: "t2".into(),
method: Method::RunList,
params: serde_json::json!({}),
};
let json = serde_json::to_string(&req).unwrap();
let resp = dispatch(&json, &api, &engine, None).await;
match resp {
Response::Success { id, result } => {
assert_eq!(id, "t2");
let runs = result.get("runs").and_then(|v| v.as_array()).unwrap();
assert!(runs.is_empty());
}
_ => panic!("expected success"),
}
}
#[tokio::test]
async fn test_dispatch_workflow_chain_queues() {
let engine = make_test_engine().await;
let api = engine.api().clone();
let req = Request {
id: "t3".into(),
method: Method::WorkflowChain,
params: serde_json::json!({
"start": "plan_issue",
"issue": 42,
"max_steps": 1,
"pause_for_approval": false,
}),
};
let json = serde_json::to_string(&req).unwrap();
let resp = dispatch(&json, &api, &engine, None).await;
match resp {
Response::Success { id, result } => {
assert_eq!(id, "t3");
assert!(
result
.get("queued")
.and_then(|v| v.as_bool())
.unwrap_or(false)
);
}
_ => panic!("expected success"),
}
}
#[tokio::test]
async fn test_dispatch_invalid_json() {
let engine = make_test_engine().await;
let api = engine.api().clone();
let resp = dispatch("not valid json", &api, &engine, None).await;
match resp {
Response::Error { error, .. } => {
assert!(error.contains("invalid request"));
}
_ => panic!("expected error"),
}
}
#[tokio::test]
async fn test_dispatch_subscribe_error() {
let engine = make_test_engine().await;
let api = engine.api().clone();
let req = Request {
id: "t5".into(),
method: Method::Subscribe,
params: serde_json::json!({}),
};
let json = serde_json::to_string(&req).unwrap();
let resp = dispatch(&json, &api, &engine, None).await;
match resp {
Response::Error { id, error } => {
assert_eq!(id, "t5");
assert!(error.contains("subscribe must be the first command"));
}
_ => panic!("expected error"),
}
}
#[tokio::test]
async fn test_dispatch_workflow_chain_missing_params() {
let engine = make_test_engine().await;
let api = engine.api().clone();
let req = Request {
id: "t6".into(),
method: Method::WorkflowChain,
params: serde_json::json!({}),
};
let json = serde_json::to_string(&req).unwrap();
let resp = dispatch(&json, &api, &engine, None).await;
match resp {
Response::Error { error, .. } => {
assert!(error.contains("missing"));
}
_ => panic!("expected error"),
}
}
}