beads_rs/daemon/
ipc.rs

1//! IPC protocol types and codec.
2//!
3//! Protocol: newline-delimited JSON (ndjson) over Unix socket.
4//!
5//! Request format: `{"op": "create", ...}\n`
6//! Response format: `{"ok": ...}\n` or `{"err": {"code": "...", "message": "..."}}\n`
7
8use std::fs;
9use std::fs::OpenOptions;
10use std::io::{BufRead, BufReader, Write};
11use std::os::unix::fs::PermissionsExt;
12use std::os::unix::net::UnixStream;
13use std::path::PathBuf;
14use std::process::{Command, Stdio};
15use std::time::{Duration, SystemTime};
16
17use serde::{Deserialize, Serialize};
18use thiserror::Error;
19
20use super::ops::{BeadOp, BeadPatch, OpError, OpResult};
21use super::query::{Filters, Query, QueryResult};
22use crate::core::{BeadId, BeadType, CoreError, DepKind, InvalidId, Priority};
23use crate::error::{Effect, Transience};
24
25pub const IPC_PROTOCOL_VERSION: u32 = 1;
26
27// =============================================================================
28// Request - All IPC requests
29// =============================================================================
30
31/// IPC request (mutation or query).
32#[derive(Debug, Clone, Serialize, Deserialize)]
33#[serde(tag = "op", rename_all = "snake_case")]
34pub enum Request {
35    // === Mutations ===
36    /// Create a new bead.
37    Create {
38        repo: PathBuf,
39        #[serde(default)]
40        id: Option<String>,
41        #[serde(default)]
42        parent: Option<String>,
43        title: String,
44        #[serde(rename = "type")]
45        bead_type: BeadType,
46        priority: Priority,
47        #[serde(default)]
48        description: Option<String>,
49        #[serde(default)]
50        design: Option<String>,
51        #[serde(default)]
52        acceptance_criteria: Option<String>,
53        #[serde(default)]
54        assignee: Option<String>,
55        #[serde(default)]
56        external_ref: Option<String>,
57        #[serde(default)]
58        estimated_minutes: Option<u32>,
59        #[serde(default)]
60        labels: Vec<String>,
61        #[serde(default)]
62        dependencies: Vec<String>,
63    },
64
65    /// Update an existing bead.
66    Update {
67        repo: PathBuf,
68        id: String,
69        patch: BeadPatch,
70        #[serde(default)]
71        cas: Option<String>,
72    },
73
74    /// Close a bead.
75    Close {
76        repo: PathBuf,
77        id: String,
78        #[serde(default)]
79        reason: Option<String>,
80        #[serde(default)]
81        on_branch: Option<String>,
82    },
83
84    /// Reopen a closed bead.
85    Reopen { repo: PathBuf, id: String },
86
87    /// Delete a bead (soft delete).
88    Delete {
89        repo: PathBuf,
90        id: String,
91        #[serde(default)]
92        reason: Option<String>,
93    },
94
95    /// Add a dependency.
96    AddDep {
97        repo: PathBuf,
98        from: String,
99        to: String,
100        kind: DepKind,
101    },
102
103    /// Remove a dependency.
104    RemoveDep {
105        repo: PathBuf,
106        from: String,
107        to: String,
108        kind: DepKind,
109    },
110
111    /// Add a note.
112    AddNote {
113        repo: PathBuf,
114        id: String,
115        content: String,
116    },
117
118    /// Claim a bead.
119    Claim {
120        repo: PathBuf,
121        id: String,
122        #[serde(default = "default_lease_secs")]
123        lease_secs: u64,
124    },
125
126    /// Release a claim.
127    Unclaim { repo: PathBuf, id: String },
128
129    /// Extend a claim.
130    ExtendClaim {
131        repo: PathBuf,
132        id: String,
133        lease_secs: u64,
134    },
135
136    // === Queries ===
137    /// Get a single bead.
138    Show { repo: PathBuf, id: String },
139
140    /// List beads.
141    List {
142        repo: PathBuf,
143        #[serde(default)]
144        filters: Filters,
145    },
146
147    /// Get ready beads.
148    Ready {
149        repo: PathBuf,
150        #[serde(default)]
151        limit: Option<usize>,
152    },
153
154    /// Get dependency tree.
155    DepTree { repo: PathBuf, id: String },
156
157    /// Get dependencies.
158    Deps { repo: PathBuf, id: String },
159
160    /// Get notes.
161    Notes { repo: PathBuf, id: String },
162
163    /// Get blocked issues.
164    Blocked { repo: PathBuf },
165
166    /// Get stale issues.
167    Stale {
168        repo: PathBuf,
169        #[serde(default)]
170        days: u32,
171        #[serde(default)]
172        status: Option<String>,
173        #[serde(default)]
174        limit: Option<usize>,
175    },
176
177    /// Count issues matching filters.
178    Count {
179        repo: PathBuf,
180        #[serde(default)]
181        filters: Filters,
182        #[serde(default)]
183        group_by: Option<String>,
184    },
185
186    /// Show deleted (tombstoned) issues.
187    Deleted {
188        repo: PathBuf,
189        #[serde(default)]
190        since_ms: Option<u64>,
191        #[serde(default)]
192        id: Option<String>,
193    },
194
195    /// Epic completion status.
196    EpicStatus {
197        repo: PathBuf,
198        #[serde(default)]
199        eligible_only: bool,
200    },
201
202    // === Control ===
203    /// Force reload state from git (invalidates cache).
204    /// Use after external changes to refs/heads/beads/store (e.g., migration).
205    Refresh { repo: PathBuf },
206
207    /// Force sync now.
208    Sync { repo: PathBuf },
209
210    /// Wait until repo is clean (debounced sync flushed).
211    SyncWait { repo: PathBuf },
212
213    /// Initialize beads ref.
214    Init { repo: PathBuf },
215
216    /// Get sync status.
217    Status { repo: PathBuf },
218
219    /// Validate state.
220    Validate { repo: PathBuf },
221
222    /// Ping (health check).
223    Ping,
224
225    /// Shutdown daemon.
226    Shutdown,
227}
228
229fn default_lease_secs() -> u64 {
230    3600 // 1 hour default
231}
232
233impl Request {
234    /// Convert to BeadOp if this is a mutation request.
235    pub fn to_op(&self) -> Result<Option<BeadOp>, IpcError> {
236        match self {
237            Request::Create {
238                repo,
239                id,
240                parent,
241                title,
242                bead_type,
243                priority,
244                description,
245                design,
246                acceptance_criteria,
247                assignee,
248                external_ref,
249                estimated_minutes,
250                labels,
251                dependencies,
252            } => Ok(Some(BeadOp::Create {
253                repo: repo.clone(),
254                id: id.clone(),
255                parent: parent.clone(),
256                title: title.clone(),
257                bead_type: *bead_type,
258                priority: *priority,
259                description: description.clone(),
260                design: design.clone(),
261                acceptance_criteria: acceptance_criteria.clone(),
262                assignee: assignee.clone(),
263                external_ref: external_ref.clone(),
264                estimated_minutes: *estimated_minutes,
265                labels: labels.clone(),
266                dependencies: dependencies.clone(),
267            })),
268
269            Request::Update {
270                repo,
271                id,
272                patch,
273                cas,
274            } => {
275                let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
276                Ok(Some(BeadOp::Update {
277                    repo: repo.clone(),
278                    id,
279                    patch: patch.clone(),
280                    cas: cas.clone(),
281                }))
282            }
283
284            Request::Close {
285                repo,
286                id,
287                reason,
288                on_branch,
289            } => {
290                let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
291                Ok(Some(BeadOp::Close {
292                    repo: repo.clone(),
293                    id,
294                    reason: reason.clone(),
295                    on_branch: on_branch.clone(),
296                }))
297            }
298
299            Request::Reopen { repo, id } => {
300                let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
301                Ok(Some(BeadOp::Reopen {
302                    repo: repo.clone(),
303                    id,
304                }))
305            }
306
307            Request::Delete { repo, id, reason } => {
308                let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
309                Ok(Some(BeadOp::Delete {
310                    repo: repo.clone(),
311                    id,
312                    reason: reason.clone(),
313                }))
314            }
315
316            Request::AddDep {
317                repo,
318                from,
319                to,
320                kind,
321            } => {
322                let from = BeadId::parse(from).map_err(map_core_invalid_id)?;
323                let to = BeadId::parse(to).map_err(map_core_invalid_id)?;
324                Ok(Some(BeadOp::AddDep {
325                    repo: repo.clone(),
326                    from,
327                    to,
328                    kind: *kind,
329                }))
330            }
331
332            Request::RemoveDep {
333                repo,
334                from,
335                to,
336                kind,
337            } => {
338                let from = BeadId::parse(from).map_err(map_core_invalid_id)?;
339                let to = BeadId::parse(to).map_err(map_core_invalid_id)?;
340                Ok(Some(BeadOp::RemoveDep {
341                    repo: repo.clone(),
342                    from,
343                    to,
344                    kind: *kind,
345                }))
346            }
347
348            Request::AddNote { repo, id, content } => {
349                let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
350                Ok(Some(BeadOp::AddNote {
351                    repo: repo.clone(),
352                    id,
353                    content: content.clone(),
354                }))
355            }
356
357            Request::Claim {
358                repo,
359                id,
360                lease_secs,
361            } => {
362                let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
363                Ok(Some(BeadOp::Claim {
364                    repo: repo.clone(),
365                    id,
366                    lease_secs: *lease_secs,
367                }))
368            }
369
370            Request::Unclaim { repo, id } => {
371                let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
372                Ok(Some(BeadOp::Unclaim {
373                    repo: repo.clone(),
374                    id,
375                }))
376            }
377
378            Request::ExtendClaim {
379                repo,
380                id,
381                lease_secs,
382            } => {
383                let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
384                Ok(Some(BeadOp::ExtendClaim {
385                    repo: repo.clone(),
386                    id,
387                    lease_secs: *lease_secs,
388                }))
389            }
390
391            // Not mutations
392            _ => Ok(None),
393        }
394    }
395
396    /// Convert to Query if this is a query request.
397    pub fn to_query(&self) -> Result<Option<Query>, IpcError> {
398        match self {
399            Request::Show { repo, id } => {
400                let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
401                Ok(Some(Query::Show {
402                    repo: repo.clone(),
403                    id,
404                }))
405            }
406
407            Request::List { repo, filters } => Ok(Some(Query::List {
408                repo: repo.clone(),
409                filters: filters.clone(),
410            })),
411
412            Request::Ready { repo, limit } => Ok(Some(Query::Ready {
413                repo: repo.clone(),
414                limit: *limit,
415            })),
416
417            Request::DepTree { repo, id } => {
418                let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
419                Ok(Some(Query::DepTree {
420                    repo: repo.clone(),
421                    id,
422                }))
423            }
424
425            Request::Deps { repo, id } => {
426                let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
427                Ok(Some(Query::Deps {
428                    repo: repo.clone(),
429                    id,
430                }))
431            }
432
433            Request::Notes { repo, id } => {
434                let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
435                Ok(Some(Query::Notes {
436                    repo: repo.clone(),
437                    id,
438                }))
439            }
440
441            Request::Status { repo } => Ok(Some(Query::Status { repo: repo.clone() })),
442
443            Request::Blocked { repo } => Ok(Some(Query::Blocked { repo: repo.clone() })),
444
445            Request::Stale {
446                repo,
447                days,
448                status,
449                limit,
450            } => Ok(Some(Query::Stale {
451                repo: repo.clone(),
452                days: *days,
453                status: status.clone(),
454                limit: *limit,
455            })),
456
457            Request::Count {
458                repo,
459                filters,
460                group_by,
461            } => Ok(Some(Query::Count {
462                repo: repo.clone(),
463                filter: filters.clone(),
464                group_by: group_by.clone(),
465            })),
466
467            Request::Deleted { repo, since_ms, id } => Ok(Some(Query::Deleted {
468                repo: repo.clone(),
469                since_ms: *since_ms,
470                id: id
471                    .as_ref()
472                    .map(|s| BeadId::parse(s).map_err(map_core_invalid_id))
473                    .transpose()?,
474            })),
475
476            Request::EpicStatus {
477                repo,
478                eligible_only,
479            } => Ok(Some(Query::EpicStatus {
480                repo: repo.clone(),
481                eligible_only: *eligible_only,
482            })),
483
484            Request::Validate { repo } => Ok(Some(Query::Validate { repo: repo.clone() })),
485
486            // Not queries
487            _ => Ok(None),
488        }
489    }
490}
491
492// =============================================================================
493// Response - IPC responses
494// =============================================================================
495
496/// IPC response.
497#[derive(Debug, Clone, Serialize, Deserialize)]
498#[serde(untagged)]
499#[allow(clippy::large_enum_variant)]
500pub enum Response {
501    Ok { ok: ResponsePayload },
502    Err { err: ErrorPayload },
503}
504
505impl Response {
506    /// Create a success response.
507    pub fn ok(payload: ResponsePayload) -> Self {
508        Response::Ok { ok: payload }
509    }
510
511    /// Create an error response.
512    pub fn err(error: impl Into<ErrorPayload>) -> Self {
513        Response::Err { err: error.into() }
514    }
515}
516
517/// Successful response payload.
518///
519/// Uses untagged serialization for backward compatibility. Unit-like variants
520/// use wrapper structs with a `result` field to avoid serializing as `null`,
521/// which would be ambiguous during deserialization.
522#[derive(Debug, Clone, Serialize, Deserialize)]
523#[serde(untagged)]
524#[allow(clippy::large_enum_variant)]
525pub enum ResponsePayload {
526    /// Mutation result.
527    Op(OpResult),
528
529    /// Query result.
530    Query(QueryResult),
531
532    /// Sync completed.
533    Synced(SyncedPayload),
534
535    /// Refresh completed (state reloaded from git).
536    Refreshed(RefreshedPayload),
537
538    /// Init completed.
539    Initialized(InitializedPayload),
540
541    /// Shutdown ack.
542    ShuttingDown(ShuttingDownPayload),
543}
544
545impl ResponsePayload {
546    /// Create a synced payload.
547    pub fn synced() -> Self {
548        ResponsePayload::Synced(SyncedPayload::default())
549    }
550
551    /// Create an initialized payload.
552    pub fn initialized() -> Self {
553        ResponsePayload::Initialized(InitializedPayload::default())
554    }
555
556    /// Create a refreshed payload.
557    pub fn refreshed() -> Self {
558        ResponsePayload::Refreshed(RefreshedPayload::default())
559    }
560
561    /// Create a shutting down payload.
562    pub fn shutting_down() -> Self {
563        ResponsePayload::ShuttingDown(ShuttingDownPayload::default())
564    }
565}
566
567/// Payload for sync completion. Uses typed discriminant for unambiguous deserialization.
568#[derive(Debug, Clone, Serialize, Deserialize, Default)]
569pub struct SyncedPayload {
570    result: SyncedTag,
571}
572
573#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
574enum SyncedTag {
575    #[default]
576    #[serde(rename = "synced")]
577    Synced,
578}
579
580/// Payload for refresh completion. Uses typed discriminant for unambiguous deserialization.
581#[derive(Debug, Clone, Serialize, Deserialize, Default)]
582pub struct RefreshedPayload {
583    result: RefreshedTag,
584}
585
586#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
587enum RefreshedTag {
588    #[default]
589    #[serde(rename = "refreshed")]
590    Refreshed,
591}
592
593/// Payload for init completion. Uses typed discriminant for unambiguous deserialization.
594#[derive(Debug, Clone, Serialize, Deserialize, Default)]
595pub struct InitializedPayload {
596    result: InitializedTag,
597}
598
599#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
600enum InitializedTag {
601    #[default]
602    #[serde(rename = "initialized")]
603    Initialized,
604}
605
606/// Payload for shutdown acknowledgment. Uses typed discriminant for unambiguous deserialization.
607#[derive(Debug, Clone, Serialize, Deserialize, Default)]
608pub struct ShuttingDownPayload {
609    result: ShuttingDownTag,
610}
611
612#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
613enum ShuttingDownTag {
614    #[default]
615    #[serde(rename = "shutting_down")]
616    ShuttingDown,
617}
618
619/// Error response payload.
620#[derive(Debug, Clone, Serialize, Deserialize)]
621pub struct ErrorPayload {
622    pub code: String,
623    pub message: String,
624    #[serde(skip_serializing_if = "Option::is_none")]
625    pub details: Option<serde_json::Value>,
626}
627
628impl From<OpError> for ErrorPayload {
629    fn from(e: OpError) -> Self {
630        let details = serde_json::json!({
631            "retryable": e.transience().is_retryable(),
632            "effect": e.effect().as_str(),
633        });
634        ErrorPayload {
635            code: e.code().to_string(),
636            message: e.to_string(),
637            details: Some(details),
638        }
639    }
640}
641
642impl From<IpcError> for ErrorPayload {
643    fn from(e: IpcError) -> Self {
644        let details = serde_json::json!({
645            "retryable": e.transience().is_retryable(),
646            "effect": e.effect().as_str(),
647        });
648        ErrorPayload {
649            code: e.code().to_string(),
650            message: e.to_string(),
651            details: Some(details),
652        }
653    }
654}
655
656// =============================================================================
657// IpcError
658// =============================================================================
659
660/// IPC-specific errors.
661#[derive(Error, Debug)]
662#[non_exhaustive]
663pub enum IpcError {
664    #[error("parse error: {0}")]
665    Parse(#[from] serde_json::Error),
666
667    #[error("IO error: {0}")]
668    Io(#[from] std::io::Error),
669
670    #[error(transparent)]
671    InvalidId(#[from] InvalidId),
672
673    #[error("client disconnected")]
674    Disconnected,
675
676    #[error("daemon unavailable: {0}")]
677    DaemonUnavailable(String),
678
679    #[error("daemon version mismatch; restart the daemon and retry")]
680    DaemonVersionMismatch {
681        daemon: Option<crate::api::DaemonInfo>,
682        client_version: String,
683        protocol_version: u32,
684        /// If set, the mismatch was detected via a parse failure.
685        parse_error: Option<String>,
686    },
687}
688
689impl IpcError {
690    pub fn code(&self) -> &'static str {
691        match self {
692            IpcError::Parse(_) => "parse_error",
693            IpcError::Io(_) => "io_error",
694            IpcError::InvalidId(_) => "invalid_id",
695            IpcError::Disconnected => "disconnected",
696            IpcError::DaemonUnavailable(_) => "daemon_unavailable",
697            IpcError::DaemonVersionMismatch { .. } => "daemon_version_mismatch",
698        }
699    }
700
701    /// Whether retrying the IPC operation may succeed.
702    pub fn transience(&self) -> Transience {
703        match self {
704            IpcError::DaemonUnavailable(_) | IpcError::Io(_) | IpcError::Disconnected => {
705                Transience::Retryable
706            }
707            IpcError::DaemonVersionMismatch { .. } => Transience::Retryable,
708            IpcError::Parse(_) | IpcError::InvalidId(_) => Transience::Permanent,
709        }
710    }
711
712    /// What we know about side effects when this IPC error is returned.
713    pub fn effect(&self) -> Effect {
714        match self {
715            IpcError::Io(_) | IpcError::Disconnected => Effect::Unknown,
716            IpcError::DaemonUnavailable(_) | IpcError::Parse(_) | IpcError::InvalidId(_) => {
717                Effect::None
718            }
719            IpcError::DaemonVersionMismatch { .. } => Effect::None,
720        }
721    }
722}
723
724fn map_core_invalid_id(e: CoreError) -> IpcError {
725    match e {
726        CoreError::InvalidId(invalid) => IpcError::InvalidId(invalid),
727        other => IpcError::DaemonUnavailable(other.to_string()),
728    }
729}
730
731// =============================================================================
732// Codec - Encoding/decoding
733// =============================================================================
734
735/// Encode a response to bytes.
736pub fn encode_response(resp: &Response) -> Result<Vec<u8>, IpcError> {
737    let mut bytes = serde_json::to_vec(resp)?;
738    bytes.push(b'\n');
739    Ok(bytes)
740}
741
742/// Decode a request from a line.
743pub fn decode_request(line: &str) -> Result<Request, IpcError> {
744    Ok(serde_json::from_str(line)?)
745}
746
747/// Send a response over a stream.
748pub fn send_response(stream: &mut UnixStream, resp: &Response) -> Result<(), IpcError> {
749    let bytes = encode_response(resp)?;
750    stream.write_all(&bytes)?;
751    Ok(())
752}
753
754/// Read requests from a stream.
755pub fn read_requests(stream: UnixStream) -> impl Iterator<Item = Result<Request, IpcError>> {
756    let reader = BufReader::new(stream);
757    reader.lines().map(|line| {
758        let line = line?;
759        decode_request(&line)
760    })
761}
762
763// =============================================================================
764// Socket path
765// =============================================================================
766
767/// Get the directory that will contain the daemon socket.
768pub fn socket_dir() -> PathBuf {
769    socket_dir_candidates()
770        .into_iter()
771        .next()
772        .unwrap_or_else(per_user_tmp_dir)
773}
774
775/// Ensure the socket directory exists and is user-private.
776pub fn ensure_socket_dir() -> Result<PathBuf, IpcError> {
777    let mut last_err: Option<std::io::Error> = None;
778    for dir in socket_dir_candidates() {
779        match fs::create_dir_all(&dir) {
780            Ok(()) => {
781                #[cfg(unix)]
782                {
783                    let mode = fs::metadata(&dir)?.permissions().mode() & 0o777;
784                    if mode != 0o700 {
785                        fs::set_permissions(&dir, fs::Permissions::from_mode(0o700))?;
786                    }
787                }
788                return Ok(dir);
789            }
790            Err(e) => last_err = Some(e),
791        }
792    }
793
794    Err(IpcError::Io(last_err.unwrap_or_else(|| {
795        std::io::Error::other("unable to create a writable socket directory")
796    })))
797}
798
799/// Get the daemon socket path.
800pub fn socket_path() -> PathBuf {
801    ensure_socket_dir()
802        .map(|dir| dir.join("daemon.sock"))
803        .unwrap_or_else(|_| per_user_tmp_dir().join("daemon.sock"))
804}
805
806/// Read daemon metadata from the meta file.
807/// Returns None if file doesn't exist or is corrupt.
808fn read_daemon_meta() -> Option<crate::api::DaemonInfo> {
809    let dir = ensure_socket_dir().ok()?;
810    let meta_path = dir.join("daemon.meta.json");
811    let contents = fs::read_to_string(&meta_path).ok()?;
812    serde_json::from_str(&contents).ok()
813}
814
815fn per_user_tmp_dir() -> PathBuf {
816    let uid = nix::unistd::geteuid();
817    PathBuf::from("/tmp").join(format!("beads-{}", uid))
818}
819
820fn socket_dir_candidates() -> Vec<PathBuf> {
821    let mut dirs = Vec::new();
822    if let Ok(dir) = std::env::var("XDG_RUNTIME_DIR")
823        && !dir.trim().is_empty()
824    {
825        dirs.push(PathBuf::from(dir).join("beads"));
826    }
827    if let Ok(home) = std::env::var("HOME")
828        && !home.trim().is_empty()
829    {
830        dirs.push(PathBuf::from(home).join(".beads"));
831    }
832    dirs.push(per_user_tmp_dir());
833    dirs
834}
835
836// =============================================================================
837// Client - Send requests to daemon
838// =============================================================================
839
840fn should_autostart(err: &std::io::Error) -> bool {
841    matches!(
842        err.kind(),
843        std::io::ErrorKind::NotFound
844            | std::io::ErrorKind::ConnectionRefused
845            | std::io::ErrorKind::ConnectionReset
846            | std::io::ErrorKind::ConnectionAborted
847    )
848}
849
850fn maybe_remove_stale_lock(lock_path: &PathBuf) {
851    if let Ok(meta) = fs::metadata(lock_path)
852        && let Ok(modified) = meta.modified()
853        && let Ok(age) = modified.elapsed()
854        && age > Duration::from_secs(10)
855    {
856        let _ = fs::remove_file(lock_path);
857    }
858}
859
860fn daemon_command() -> Command {
861    if let Ok(exe) = std::env::current_exe() {
862        let mut cmd = Command::new(exe);
863        cmd.arg("daemon").arg("run");
864        return cmd;
865    }
866
867    let mut cmd = Command::new("bd");
868    cmd.arg("daemon").arg("run");
869    cmd
870}
871
872fn connect_with_autostart(socket: &PathBuf) -> Result<UnixStream, IpcError> {
873    match UnixStream::connect(socket) {
874        Ok(stream) => Ok(stream),
875        Err(e) if should_autostart(&e) => {
876            // Try to autostart daemon with a simple lock to avoid herds.
877            let dir = ensure_socket_dir()?;
878            let lock_path = dir.join("daemon.lock");
879            maybe_remove_stale_lock(&lock_path);
880
881            let mut we_spawned = OpenOptions::new()
882                .write(true)
883                .create_new(true)
884                .open(&lock_path)
885                .is_ok();
886
887            if we_spawned {
888                let mut cmd = daemon_command();
889                cmd.stdin(Stdio::null())
890                    .stdout(Stdio::null())
891                    .stderr(Stdio::null());
892                cmd.spawn().map_err(|e| {
893                    IpcError::DaemonUnavailable(format!("failed to spawn daemon: {}", e))
894                })?;
895            }
896
897            let deadline = SystemTime::now() + Duration::from_secs(30);
898            let mut backoff = Duration::from_millis(50);
899
900            loop {
901                match UnixStream::connect(socket) {
902                    Ok(stream) => {
903                        if we_spawned {
904                            let _ = fs::remove_file(&lock_path);
905                        }
906                        return Ok(stream);
907                    }
908                    Err(e) if should_autostart(&e) => {
909                        if !we_spawned {
910                            // If the lock disappeared (spawner died), try to take over.
911                            maybe_remove_stale_lock(&lock_path);
912                            if OpenOptions::new()
913                                .write(true)
914                                .create_new(true)
915                                .open(&lock_path)
916                                .is_ok()
917                            {
918                                we_spawned = true;
919                                let mut cmd = daemon_command();
920                                cmd.stdin(Stdio::null())
921                                    .stdout(Stdio::null())
922                                    .stderr(Stdio::null());
923                                if let Err(e) = cmd.spawn() {
924                                    let _ = fs::remove_file(&lock_path);
925                                    return Err(IpcError::DaemonUnavailable(format!(
926                                        "failed to spawn daemon: {}",
927                                        e
928                                    )));
929                                }
930                            }
931                        }
932                        if SystemTime::now() >= deadline {
933                            if we_spawned {
934                                let _ = fs::remove_file(&lock_path);
935                            }
936                            return Err(IpcError::DaemonUnavailable(
937                                "timed out waiting for daemon socket".into(),
938                            ));
939                        }
940                        std::thread::sleep(backoff);
941                        backoff = std::cmp::min(backoff * 2, Duration::from_millis(200));
942                    }
943                    Err(e) => {
944                        if we_spawned {
945                            let _ = fs::remove_file(&lock_path);
946                        }
947                        return Err(IpcError::Io(e));
948                    }
949                }
950            }
951        }
952        Err(e) => Err(IpcError::Io(e)),
953    }
954}
955
956fn write_req_line(stream: &mut UnixStream, req: &Request) -> Result<(), IpcError> {
957    let mut json = serde_json::to_string(req)?;
958    json.push('\n');
959    stream.write_all(json.as_bytes())?;
960    Ok(())
961}
962
963fn read_resp_line(reader: &mut BufReader<UnixStream>) -> Result<Response, IpcError> {
964    let mut line = String::new();
965    let bytes_read = reader.read_line(&mut line)?;
966    // EOF means daemon closed connection (likely just shut down)
967    if bytes_read == 0 || line.trim().is_empty() {
968        return Err(IpcError::DaemonUnavailable(
969            "daemon not running (stale socket)".into(),
970        ));
971    }
972    Ok(serde_json::from_str(&line)?)
973}
974
975/// Read response line, converting parse errors to version mismatch.
976///
977/// Used during version verification where a parse failure likely indicates
978/// an incompatible daemon version.
979fn read_resp_line_version_check(reader: &mut BufReader<UnixStream>) -> Result<Response, IpcError> {
980    let mut line = String::new();
981    let bytes_read = reader.read_line(&mut line)?;
982    if bytes_read == 0 || line.trim().is_empty() {
983        return Err(IpcError::DaemonUnavailable(
984            "daemon not running (stale socket)".into(),
985        ));
986    }
987    serde_json::from_str(&line).map_err(|e| IpcError::DaemonVersionMismatch {
988        daemon: None,
989        client_version: env!("CARGO_PKG_VERSION").to_string(),
990        protocol_version: IPC_PROTOCOL_VERSION,
991        parse_error: Some(e.to_string()),
992    })
993}
994
995fn verify_daemon_version(
996    writer: &mut UnixStream,
997    reader: &mut BufReader<UnixStream>,
998) -> Result<(), IpcError> {
999    write_req_line(writer, &Request::Ping)?;
1000    // Use version-check variant that converts parse errors to version mismatch
1001    let resp = read_resp_line_version_check(reader)?;
1002    let Response::Ok { ok } = resp else {
1003        return Err(IpcError::DaemonVersionMismatch {
1004            daemon: None,
1005            client_version: env!("CARGO_PKG_VERSION").to_string(),
1006            protocol_version: IPC_PROTOCOL_VERSION,
1007            parse_error: None,
1008        });
1009    };
1010
1011    let ResponsePayload::Query(QueryResult::DaemonInfo(info)) = ok else {
1012        return Err(IpcError::DaemonVersionMismatch {
1013            daemon: None,
1014            client_version: env!("CARGO_PKG_VERSION").to_string(),
1015            protocol_version: IPC_PROTOCOL_VERSION,
1016            parse_error: Some("unexpected response payload type".into()),
1017        });
1018    };
1019
1020    if info.protocol_version != IPC_PROTOCOL_VERSION || info.version != env!("CARGO_PKG_VERSION") {
1021        return Err(IpcError::DaemonVersionMismatch {
1022            daemon: Some(info),
1023            client_version: env!("CARGO_PKG_VERSION").to_string(),
1024            protocol_version: IPC_PROTOCOL_VERSION,
1025            parse_error: None,
1026        });
1027    }
1028
1029    Ok(())
1030}
1031
1032fn send_request_over_stream(stream: UnixStream, req: &Request) -> Result<Response, IpcError> {
1033    let mut writer = stream;
1034    let reader_stream = writer.try_clone()?;
1035    let mut reader = BufReader::new(reader_stream);
1036
1037    // Verify daemon version/protocol once per connection.
1038    if !matches!(req, Request::Ping) {
1039        verify_daemon_version(&mut writer, &mut reader)?;
1040    }
1041
1042    write_req_line(&mut writer, req)?;
1043    read_resp_line(&mut reader)
1044}
1045
1046/// Send a request to the daemon and receive a response.
1047///
1048/// Retries up to 3 times on version mismatch or stale socket errors,
1049/// with exponential backoff between attempts.
1050pub fn send_request(req: &Request) -> Result<Response, IpcError> {
1051    const MAX_ATTEMPTS: u32 = 3;
1052    let socket = socket_path();
1053
1054    for attempt in 1..=MAX_ATTEMPTS {
1055        let stream = match connect_with_autostart(&socket) {
1056            Ok(s) => s,
1057            Err(e) => {
1058                if attempt >= MAX_ATTEMPTS {
1059                    return Err(e);
1060                }
1061                let backoff = Duration::from_millis(100 * (1 << (attempt - 1)));
1062                std::thread::sleep(backoff);
1063                continue;
1064            }
1065        };
1066
1067        match send_request_over_stream(stream, req) {
1068            Ok(resp) => return Ok(resp),
1069            Err(IpcError::DaemonVersionMismatch { daemon, .. }) if attempt < MAX_ATTEMPTS => {
1070                tracing::info!(
1071                    "daemon version mismatch, restarting (attempt {}/{})",
1072                    attempt,
1073                    MAX_ATTEMPTS
1074                );
1075
1076                // Try to restart the daemon
1077                if let Some(info) = daemon {
1078                    let _ = kill_daemon_forcefully(info.pid, &socket);
1079                } else {
1080                    let _ = try_restart_daemon_by_socket(&socket);
1081                }
1082
1083                // Exponential backoff: 100ms, 200ms, 400ms
1084                let backoff = Duration::from_millis(100 * (1 << (attempt - 1)));
1085                std::thread::sleep(backoff);
1086            }
1087            Err(IpcError::DaemonUnavailable(ref msg)) if attempt < MAX_ATTEMPTS => {
1088                tracing::debug!("daemon unavailable ({}), retrying", msg);
1089                // Socket might have gone stale mid-request
1090                let _ = try_restart_daemon_by_socket(&socket);
1091                let backoff = Duration::from_millis(100 * (1 << (attempt - 1)));
1092                std::thread::sleep(backoff);
1093            }
1094            Err(e) => return Err(e),
1095        }
1096    }
1097
1098    Err(IpcError::DaemonUnavailable(
1099        "max retry attempts exceeded".into(),
1100    ))
1101}
1102
1103/// Send a request without auto-starting the daemon.
1104///
1105/// Returns `DaemonUnavailable` if daemon is not running.
1106pub fn send_request_no_autostart(req: &Request) -> Result<Response, IpcError> {
1107    let socket = socket_path();
1108    let stream = UnixStream::connect(&socket)
1109        .map_err(|e| IpcError::DaemonUnavailable(format!("daemon not running: {}", e)))?;
1110    send_request_over_stream(stream, req)
1111}
1112
1113/// Wait for daemon to be ready and responding with expected version.
1114///
1115/// Returns Ok if daemon is responsive with matching version, Err on timeout (30s).
1116pub fn wait_for_daemon_ready(expected_version: &str) -> Result<(), IpcError> {
1117    let socket = socket_path();
1118    let deadline = SystemTime::now() + Duration::from_secs(30);
1119    let mut backoff = Duration::from_millis(50);
1120
1121    while SystemTime::now() < deadline {
1122        match UnixStream::connect(&socket) {
1123            Ok(stream) => {
1124                let mut writer = stream;
1125                let reader_stream = match writer.try_clone() {
1126                    Ok(s) => s,
1127                    Err(_) => {
1128                        std::thread::sleep(backoff);
1129                        backoff = std::cmp::min(backoff * 2, Duration::from_millis(500));
1130                        continue;
1131                    }
1132                };
1133                let mut reader = BufReader::new(reader_stream);
1134
1135                if write_req_line(&mut writer, &Request::Ping).is_err() {
1136                    std::thread::sleep(backoff);
1137                    backoff = std::cmp::min(backoff * 2, Duration::from_millis(500));
1138                    continue;
1139                }
1140
1141                if let Ok(Response::Ok {
1142                    ok: ResponsePayload::Query(QueryResult::DaemonInfo(info)),
1143                }) = read_resp_line(&mut reader)
1144                {
1145                    if info.version == expected_version {
1146                        tracing::info!("daemon ready with version {}", expected_version);
1147                        return Ok(());
1148                    }
1149                    // Wrong version - old daemon hasn't fully died yet
1150                    tracing::debug!(
1151                        "daemon has version {}, waiting for {}",
1152                        info.version,
1153                        expected_version
1154                    );
1155                }
1156                std::thread::sleep(backoff);
1157                backoff = std::cmp::min(backoff * 2, Duration::from_millis(500));
1158            }
1159            Err(_) => {
1160                std::thread::sleep(backoff);
1161                backoff = std::cmp::min(backoff * 2, Duration::from_millis(200));
1162            }
1163        }
1164    }
1165
1166    Err(IpcError::DaemonUnavailable(format!(
1167        "timed out waiting for daemon version {}",
1168        expected_version
1169    )))
1170}
1171
1172/// Kill daemon with SIGTERM, escalating to SIGKILL if needed.
1173fn kill_daemon_forcefully(pid: u32, socket: &PathBuf) -> Result<(), IpcError> {
1174    use nix::sys::signal::{Signal, kill};
1175    use nix::unistd::Pid;
1176
1177    let nix_pid = Pid::from_raw(pid as i32);
1178
1179    // First try SIGTERM (graceful)
1180    if let Err(e) = kill(nix_pid, Signal::SIGTERM) {
1181        // ESRCH = no such process - already dead, that's fine
1182        if e == nix::errno::Errno::ESRCH {
1183            let _ = fs::remove_file(socket);
1184            let _ = fs::remove_file(socket.with_file_name("daemon.meta.json"));
1185            return Ok(());
1186        }
1187        return Err(IpcError::DaemonUnavailable(format!(
1188            "failed to stop daemon pid {pid}: {e}"
1189        )));
1190    }
1191
1192    // Wait for graceful shutdown (3 seconds)
1193    let deadline = SystemTime::now() + Duration::from_secs(3);
1194    while SystemTime::now() < deadline {
1195        if UnixStream::connect(socket).is_err() {
1196            return Ok(());
1197        }
1198        std::thread::sleep(Duration::from_millis(50));
1199    }
1200
1201    // Escalate to SIGKILL
1202    tracing::warn!(
1203        "daemon pid {} did not stop gracefully, sending SIGKILL",
1204        pid
1205    );
1206    if let Err(e) = kill(nix_pid, Signal::SIGKILL)
1207        && e != nix::errno::Errno::ESRCH
1208    {
1209        return Err(IpcError::DaemonUnavailable(format!(
1210            "failed to kill daemon pid {pid}: {e}"
1211        )));
1212    }
1213
1214    // Wait for socket to become stale (2 more seconds)
1215    let deadline = SystemTime::now() + Duration::from_secs(2);
1216    while SystemTime::now() < deadline {
1217        if UnixStream::connect(socket).is_err() {
1218            return Ok(());
1219        }
1220        std::thread::sleep(Duration::from_millis(50));
1221    }
1222
1223    // Force remove socket and meta as last resort
1224    let _ = fs::remove_file(socket);
1225    let _ = fs::remove_file(socket.with_file_name("daemon.meta.json"));
1226    Ok(())
1227}
1228
1229/// Try to restart daemon when we don't have the PID from response.
1230///
1231/// Uses daemon.meta.json to find PID if available.
1232fn try_restart_daemon_by_socket(socket: &PathBuf) -> Result<(), IpcError> {
1233    // Try to get PID from meta file first
1234    if let Some(meta) = read_daemon_meta() {
1235        tracing::debug!("found daemon pid {} from meta file", meta.pid);
1236        return kill_daemon_forcefully(meta.pid, socket);
1237    }
1238
1239    // No meta file (very old daemon or corrupt state)
1240    tracing::warn!("no daemon meta file found, removing stale socket");
1241
1242    // Best effort: remove socket file. The orphaned daemon will eventually
1243    // exit when it has no clients and no work.
1244    if let Err(e) = fs::remove_file(socket)
1245        && e.kind() != std::io::ErrorKind::NotFound
1246    {
1247        return Err(IpcError::DaemonUnavailable(format!(
1248            "failed to remove stale socket: {e}"
1249        )));
1250    }
1251
1252    // Also remove meta file if present
1253    let _ = fs::remove_file(socket.with_file_name("daemon.meta.json"));
1254
1255    Ok(())
1256}
1257
1258#[cfg(test)]
1259mod tests {
1260    use super::*;
1261
1262    #[test]
1263    fn request_roundtrip() {
1264        let req = Request::Create {
1265            repo: PathBuf::from("/test"),
1266            id: None,
1267            parent: None,
1268            title: "test".to_string(),
1269            bead_type: BeadType::Task,
1270            priority: Priority::default(),
1271            description: None,
1272            design: None,
1273            acceptance_criteria: None,
1274            assignee: None,
1275            external_ref: None,
1276            estimated_minutes: None,
1277            labels: Vec::new(),
1278            dependencies: Vec::new(),
1279        };
1280
1281        let json = serde_json::to_string(&req).unwrap();
1282        let parsed: Request = serde_json::from_str(&json).unwrap();
1283
1284        match parsed {
1285            Request::Create { title, .. } => assert_eq!(title, "test"),
1286            _ => panic!("wrong request type"),
1287        }
1288    }
1289
1290    #[test]
1291    fn response_ok() {
1292        let resp = Response::ok(ResponsePayload::synced());
1293        let json = serde_json::to_string(&resp).unwrap();
1294        assert!(json.contains("\"ok\""));
1295        // Synced now serializes to {"result":"synced"}, not null
1296        assert!(json.contains("\"result\":\"synced\""));
1297    }
1298
1299    #[test]
1300    fn unit_variants_are_distinguishable() {
1301        // Each variant must serialize to a distinct, non-null value with result field
1302        let synced = serde_json::to_string(&ResponsePayload::synced()).unwrap();
1303        let initialized = serde_json::to_string(&ResponsePayload::initialized()).unwrap();
1304        let shutting_down = serde_json::to_string(&ResponsePayload::shutting_down()).unwrap();
1305
1306        assert!(synced.contains("\"result\":\"synced\""));
1307        assert!(initialized.contains("\"result\":\"initialized\""));
1308        assert!(shutting_down.contains("\"result\":\"shutting_down\""));
1309
1310        // None serialize as null
1311        assert!(!synced.contains("null"));
1312        assert!(!initialized.contains("null"));
1313        assert!(!shutting_down.contains("null"));
1314
1315        // All are distinct
1316        assert_ne!(synced, initialized);
1317        assert_ne!(synced, shutting_down);
1318        assert_ne!(initialized, shutting_down);
1319    }
1320
1321    #[test]
1322    fn unit_variants_roundtrip() {
1323        // Verify each variant can be deserialized back correctly
1324        let synced_json = serde_json::to_string(&ResponsePayload::synced()).unwrap();
1325        let parsed: ResponsePayload = serde_json::from_str(&synced_json).unwrap();
1326        assert!(matches!(parsed, ResponsePayload::Synced(_)));
1327
1328        let init_json = serde_json::to_string(&ResponsePayload::initialized()).unwrap();
1329        let parsed: ResponsePayload = serde_json::from_str(&init_json).unwrap();
1330        assert!(matches!(parsed, ResponsePayload::Initialized(_)));
1331    }
1332
1333    #[test]
1334    fn response_err() {
1335        let resp = Response::err(ErrorPayload {
1336            code: "not_found".to_string(),
1337            message: "bead not found".to_string(),
1338            details: None,
1339        });
1340        let json = serde_json::to_string(&resp).unwrap();
1341        assert!(json.contains("\"err\""));
1342        assert!(json.contains("not_found"));
1343    }
1344
1345    #[test]
1346    fn ping_info_serializes_as_query() {
1347        let info = crate::api::DaemonInfo {
1348            version: "0.0.0-test".to_string(),
1349            protocol_version: IPC_PROTOCOL_VERSION,
1350            pid: 123,
1351        };
1352        let resp = Response::ok(ResponsePayload::Query(QueryResult::DaemonInfo(info)));
1353        let json = serde_json::to_string(&resp).unwrap();
1354        // Query variant serializes directly to its content (untagged)
1355        assert!(json.contains("\"result\":\"daemon_info\""));
1356        assert!(json.contains("\"version\""));
1357        assert!(json.contains("\"protocol_version\""));
1358        assert!(json.contains("\"pid\""));
1359    }
1360
1361    #[test]
1362    fn version_mismatch_error_includes_parse_error() {
1363        let err = IpcError::DaemonVersionMismatch {
1364            daemon: None,
1365            client_version: "0.1.0".into(),
1366            protocol_version: 1,
1367            parse_error: Some("data did not match any variant".into()),
1368        };
1369        // Error should indicate version mismatch
1370        assert_eq!(err.code(), "daemon_version_mismatch");
1371        // The error message should be actionable
1372        assert!(err.to_string().contains("restart the daemon"));
1373    }
1374
1375    #[test]
1376    fn version_mismatch_is_retryable() {
1377        let err = IpcError::DaemonVersionMismatch {
1378            daemon: None,
1379            client_version: "0.1.0".into(),
1380            protocol_version: 1,
1381            parse_error: None,
1382        };
1383        assert!(err.transience().is_retryable());
1384    }
1385
1386    #[test]
1387    fn invalid_json_would_cause_parse_error() {
1388        // Simulate what an old/incompatible daemon might send
1389        let bad_json = r#"{"unexpected": "format"}"#;
1390        let result: Result<Response, _> = serde_json::from_str(bad_json);
1391        assert!(result.is_err());
1392        // This is the error type that would be converted to DaemonVersionMismatch
1393        let err = result.unwrap_err();
1394        assert!(err.to_string().contains("did not match"));
1395    }
1396
1397    // Regression tests: verify all ResponsePayload variants roundtrip through Response
1398    mod response_roundtrip {
1399        use super::*;
1400
1401        fn roundtrip_response(resp: Response) {
1402            let json = serde_json::to_string(&resp).unwrap();
1403            let parsed: Response = serde_json::from_str(&json)
1404                .unwrap_or_else(|e| panic!("Failed to parse: {e}\nJSON: {json}"));
1405            // Re-serialize to verify structural equality
1406            let json2 = serde_json::to_string(&parsed).unwrap();
1407            assert_eq!(json, json2, "Roundtrip mismatch");
1408        }
1409
1410        #[test]
1411        fn op_created() {
1412            let resp = Response::ok(ResponsePayload::Op(OpResult::Created {
1413                id: BeadId::parse("bd-abc").unwrap(),
1414            }));
1415            roundtrip_response(resp);
1416        }
1417
1418        #[test]
1419        fn op_updated() {
1420            let resp = Response::ok(ResponsePayload::Op(OpResult::Updated {
1421                id: BeadId::parse("bd-abc").unwrap(),
1422            }));
1423            roundtrip_response(resp);
1424        }
1425
1426        #[test]
1427        fn query_issues_empty() {
1428            let resp = Response::ok(ResponsePayload::Query(QueryResult::Issues(vec![])));
1429            roundtrip_response(resp);
1430        }
1431
1432        #[test]
1433        fn query_daemon_info() {
1434            let info = crate::api::DaemonInfo {
1435                version: "0.1.0".into(),
1436                protocol_version: IPC_PROTOCOL_VERSION,
1437                pid: 12345,
1438            };
1439            let resp = Response::ok(ResponsePayload::Query(QueryResult::DaemonInfo(info)));
1440            roundtrip_response(resp);
1441        }
1442
1443        #[test]
1444        fn synced() {
1445            roundtrip_response(Response::ok(ResponsePayload::synced()));
1446        }
1447
1448        #[test]
1449        fn initialized() {
1450            roundtrip_response(Response::ok(ResponsePayload::initialized()));
1451        }
1452
1453        #[test]
1454        fn shutting_down() {
1455            roundtrip_response(Response::ok(ResponsePayload::shutting_down()));
1456        }
1457
1458        #[test]
1459        fn error_response() {
1460            let resp = Response::err(ErrorPayload {
1461                code: "test_error".into(),
1462                message: "Something went wrong".into(),
1463                details: Some(serde_json::json!({"key": "value"})),
1464            });
1465            roundtrip_response(resp);
1466        }
1467    }
1468}