1use std::fs;
9use std::fs::OpenOptions;
10use std::io::{BufRead, BufReader, Write};
11use std::os::unix::fs::PermissionsExt;
12use std::os::unix::net::UnixStream;
13use std::path::PathBuf;
14use std::process::{Command, Stdio};
15use std::time::{Duration, SystemTime};
16
17use serde::{Deserialize, Serialize};
18use thiserror::Error;
19
20use super::ops::{BeadOp, BeadPatch, OpError, OpResult};
21use super::query::{Filters, Query, QueryResult};
22use crate::core::{BeadId, BeadType, CoreError, DepKind, InvalidId, Priority};
23use crate::error::{Effect, Transience};
24
25pub const IPC_PROTOCOL_VERSION: u32 = 1;
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
33#[serde(tag = "op", rename_all = "snake_case")]
34pub enum Request {
35 Create {
38 repo: PathBuf,
39 #[serde(default)]
40 id: Option<String>,
41 #[serde(default)]
42 parent: Option<String>,
43 title: String,
44 #[serde(rename = "type")]
45 bead_type: BeadType,
46 priority: Priority,
47 #[serde(default)]
48 description: Option<String>,
49 #[serde(default)]
50 design: Option<String>,
51 #[serde(default)]
52 acceptance_criteria: Option<String>,
53 #[serde(default)]
54 assignee: Option<String>,
55 #[serde(default)]
56 external_ref: Option<String>,
57 #[serde(default)]
58 estimated_minutes: Option<u32>,
59 #[serde(default)]
60 labels: Vec<String>,
61 #[serde(default)]
62 dependencies: Vec<String>,
63 },
64
65 Update {
67 repo: PathBuf,
68 id: String,
69 patch: BeadPatch,
70 #[serde(default)]
71 cas: Option<String>,
72 },
73
74 Close {
76 repo: PathBuf,
77 id: String,
78 #[serde(default)]
79 reason: Option<String>,
80 #[serde(default)]
81 on_branch: Option<String>,
82 },
83
84 Reopen { repo: PathBuf, id: String },
86
87 Delete {
89 repo: PathBuf,
90 id: String,
91 #[serde(default)]
92 reason: Option<String>,
93 },
94
95 AddDep {
97 repo: PathBuf,
98 from: String,
99 to: String,
100 kind: DepKind,
101 },
102
103 RemoveDep {
105 repo: PathBuf,
106 from: String,
107 to: String,
108 kind: DepKind,
109 },
110
111 AddNote {
113 repo: PathBuf,
114 id: String,
115 content: String,
116 },
117
118 Claim {
120 repo: PathBuf,
121 id: String,
122 #[serde(default = "default_lease_secs")]
123 lease_secs: u64,
124 },
125
126 Unclaim { repo: PathBuf, id: String },
128
129 ExtendClaim {
131 repo: PathBuf,
132 id: String,
133 lease_secs: u64,
134 },
135
136 Show { repo: PathBuf, id: String },
139
140 List {
142 repo: PathBuf,
143 #[serde(default)]
144 filters: Filters,
145 },
146
147 Ready {
149 repo: PathBuf,
150 #[serde(default)]
151 limit: Option<usize>,
152 },
153
154 DepTree { repo: PathBuf, id: String },
156
157 Deps { repo: PathBuf, id: String },
159
160 Notes { repo: PathBuf, id: String },
162
163 Blocked { repo: PathBuf },
165
166 Stale {
168 repo: PathBuf,
169 #[serde(default)]
170 days: u32,
171 #[serde(default)]
172 status: Option<String>,
173 #[serde(default)]
174 limit: Option<usize>,
175 },
176
177 Count {
179 repo: PathBuf,
180 #[serde(default)]
181 filters: Filters,
182 #[serde(default)]
183 group_by: Option<String>,
184 },
185
186 Deleted {
188 repo: PathBuf,
189 #[serde(default)]
190 since_ms: Option<u64>,
191 #[serde(default)]
192 id: Option<String>,
193 },
194
195 EpicStatus {
197 repo: PathBuf,
198 #[serde(default)]
199 eligible_only: bool,
200 },
201
202 Refresh { repo: PathBuf },
206
207 Sync { repo: PathBuf },
209
210 SyncWait { repo: PathBuf },
212
213 Init { repo: PathBuf },
215
216 Status { repo: PathBuf },
218
219 Validate { repo: PathBuf },
221
222 Ping,
224
225 Shutdown,
227}
228
229fn default_lease_secs() -> u64 {
230 3600 }
232
233impl Request {
234 pub fn to_op(&self) -> Result<Option<BeadOp>, IpcError> {
236 match self {
237 Request::Create {
238 repo,
239 id,
240 parent,
241 title,
242 bead_type,
243 priority,
244 description,
245 design,
246 acceptance_criteria,
247 assignee,
248 external_ref,
249 estimated_minutes,
250 labels,
251 dependencies,
252 } => Ok(Some(BeadOp::Create {
253 repo: repo.clone(),
254 id: id.clone(),
255 parent: parent.clone(),
256 title: title.clone(),
257 bead_type: *bead_type,
258 priority: *priority,
259 description: description.clone(),
260 design: design.clone(),
261 acceptance_criteria: acceptance_criteria.clone(),
262 assignee: assignee.clone(),
263 external_ref: external_ref.clone(),
264 estimated_minutes: *estimated_minutes,
265 labels: labels.clone(),
266 dependencies: dependencies.clone(),
267 })),
268
269 Request::Update {
270 repo,
271 id,
272 patch,
273 cas,
274 } => {
275 let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
276 Ok(Some(BeadOp::Update {
277 repo: repo.clone(),
278 id,
279 patch: patch.clone(),
280 cas: cas.clone(),
281 }))
282 }
283
284 Request::Close {
285 repo,
286 id,
287 reason,
288 on_branch,
289 } => {
290 let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
291 Ok(Some(BeadOp::Close {
292 repo: repo.clone(),
293 id,
294 reason: reason.clone(),
295 on_branch: on_branch.clone(),
296 }))
297 }
298
299 Request::Reopen { repo, id } => {
300 let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
301 Ok(Some(BeadOp::Reopen {
302 repo: repo.clone(),
303 id,
304 }))
305 }
306
307 Request::Delete { repo, id, reason } => {
308 let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
309 Ok(Some(BeadOp::Delete {
310 repo: repo.clone(),
311 id,
312 reason: reason.clone(),
313 }))
314 }
315
316 Request::AddDep {
317 repo,
318 from,
319 to,
320 kind,
321 } => {
322 let from = BeadId::parse(from).map_err(map_core_invalid_id)?;
323 let to = BeadId::parse(to).map_err(map_core_invalid_id)?;
324 Ok(Some(BeadOp::AddDep {
325 repo: repo.clone(),
326 from,
327 to,
328 kind: *kind,
329 }))
330 }
331
332 Request::RemoveDep {
333 repo,
334 from,
335 to,
336 kind,
337 } => {
338 let from = BeadId::parse(from).map_err(map_core_invalid_id)?;
339 let to = BeadId::parse(to).map_err(map_core_invalid_id)?;
340 Ok(Some(BeadOp::RemoveDep {
341 repo: repo.clone(),
342 from,
343 to,
344 kind: *kind,
345 }))
346 }
347
348 Request::AddNote { repo, id, content } => {
349 let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
350 Ok(Some(BeadOp::AddNote {
351 repo: repo.clone(),
352 id,
353 content: content.clone(),
354 }))
355 }
356
357 Request::Claim {
358 repo,
359 id,
360 lease_secs,
361 } => {
362 let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
363 Ok(Some(BeadOp::Claim {
364 repo: repo.clone(),
365 id,
366 lease_secs: *lease_secs,
367 }))
368 }
369
370 Request::Unclaim { repo, id } => {
371 let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
372 Ok(Some(BeadOp::Unclaim {
373 repo: repo.clone(),
374 id,
375 }))
376 }
377
378 Request::ExtendClaim {
379 repo,
380 id,
381 lease_secs,
382 } => {
383 let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
384 Ok(Some(BeadOp::ExtendClaim {
385 repo: repo.clone(),
386 id,
387 lease_secs: *lease_secs,
388 }))
389 }
390
391 _ => Ok(None),
393 }
394 }
395
396 pub fn to_query(&self) -> Result<Option<Query>, IpcError> {
398 match self {
399 Request::Show { repo, id } => {
400 let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
401 Ok(Some(Query::Show {
402 repo: repo.clone(),
403 id,
404 }))
405 }
406
407 Request::List { repo, filters } => Ok(Some(Query::List {
408 repo: repo.clone(),
409 filters: filters.clone(),
410 })),
411
412 Request::Ready { repo, limit } => Ok(Some(Query::Ready {
413 repo: repo.clone(),
414 limit: *limit,
415 })),
416
417 Request::DepTree { repo, id } => {
418 let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
419 Ok(Some(Query::DepTree {
420 repo: repo.clone(),
421 id,
422 }))
423 }
424
425 Request::Deps { repo, id } => {
426 let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
427 Ok(Some(Query::Deps {
428 repo: repo.clone(),
429 id,
430 }))
431 }
432
433 Request::Notes { repo, id } => {
434 let id = BeadId::parse(id).map_err(map_core_invalid_id)?;
435 Ok(Some(Query::Notes {
436 repo: repo.clone(),
437 id,
438 }))
439 }
440
441 Request::Status { repo } => Ok(Some(Query::Status { repo: repo.clone() })),
442
443 Request::Blocked { repo } => Ok(Some(Query::Blocked { repo: repo.clone() })),
444
445 Request::Stale {
446 repo,
447 days,
448 status,
449 limit,
450 } => Ok(Some(Query::Stale {
451 repo: repo.clone(),
452 days: *days,
453 status: status.clone(),
454 limit: *limit,
455 })),
456
457 Request::Count {
458 repo,
459 filters,
460 group_by,
461 } => Ok(Some(Query::Count {
462 repo: repo.clone(),
463 filter: filters.clone(),
464 group_by: group_by.clone(),
465 })),
466
467 Request::Deleted { repo, since_ms, id } => Ok(Some(Query::Deleted {
468 repo: repo.clone(),
469 since_ms: *since_ms,
470 id: id
471 .as_ref()
472 .map(|s| BeadId::parse(s).map_err(map_core_invalid_id))
473 .transpose()?,
474 })),
475
476 Request::EpicStatus {
477 repo,
478 eligible_only,
479 } => Ok(Some(Query::EpicStatus {
480 repo: repo.clone(),
481 eligible_only: *eligible_only,
482 })),
483
484 Request::Validate { repo } => Ok(Some(Query::Validate { repo: repo.clone() })),
485
486 _ => Ok(None),
488 }
489 }
490}
491
492#[derive(Debug, Clone, Serialize, Deserialize)]
498#[serde(untagged)]
499#[allow(clippy::large_enum_variant)]
500pub enum Response {
501 Ok { ok: ResponsePayload },
502 Err { err: ErrorPayload },
503}
504
505impl Response {
506 pub fn ok(payload: ResponsePayload) -> Self {
508 Response::Ok { ok: payload }
509 }
510
511 pub fn err(error: impl Into<ErrorPayload>) -> Self {
513 Response::Err { err: error.into() }
514 }
515}
516
517#[derive(Debug, Clone, Serialize, Deserialize)]
523#[serde(untagged)]
524#[allow(clippy::large_enum_variant)]
525pub enum ResponsePayload {
526 Op(OpResult),
528
529 Query(QueryResult),
531
532 Synced(SyncedPayload),
534
535 Refreshed(RefreshedPayload),
537
538 Initialized(InitializedPayload),
540
541 ShuttingDown(ShuttingDownPayload),
543}
544
545impl ResponsePayload {
546 pub fn synced() -> Self {
548 ResponsePayload::Synced(SyncedPayload::default())
549 }
550
551 pub fn initialized() -> Self {
553 ResponsePayload::Initialized(InitializedPayload::default())
554 }
555
556 pub fn refreshed() -> Self {
558 ResponsePayload::Refreshed(RefreshedPayload::default())
559 }
560
561 pub fn shutting_down() -> Self {
563 ResponsePayload::ShuttingDown(ShuttingDownPayload::default())
564 }
565}
566
567#[derive(Debug, Clone, Serialize, Deserialize, Default)]
569pub struct SyncedPayload {
570 result: SyncedTag,
571}
572
573#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
574enum SyncedTag {
575 #[default]
576 #[serde(rename = "synced")]
577 Synced,
578}
579
580#[derive(Debug, Clone, Serialize, Deserialize, Default)]
582pub struct RefreshedPayload {
583 result: RefreshedTag,
584}
585
586#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
587enum RefreshedTag {
588 #[default]
589 #[serde(rename = "refreshed")]
590 Refreshed,
591}
592
593#[derive(Debug, Clone, Serialize, Deserialize, Default)]
595pub struct InitializedPayload {
596 result: InitializedTag,
597}
598
599#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
600enum InitializedTag {
601 #[default]
602 #[serde(rename = "initialized")]
603 Initialized,
604}
605
606#[derive(Debug, Clone, Serialize, Deserialize, Default)]
608pub struct ShuttingDownPayload {
609 result: ShuttingDownTag,
610}
611
612#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
613enum ShuttingDownTag {
614 #[default]
615 #[serde(rename = "shutting_down")]
616 ShuttingDown,
617}
618
619#[derive(Debug, Clone, Serialize, Deserialize)]
621pub struct ErrorPayload {
622 pub code: String,
623 pub message: String,
624 #[serde(skip_serializing_if = "Option::is_none")]
625 pub details: Option<serde_json::Value>,
626}
627
628impl From<OpError> for ErrorPayload {
629 fn from(e: OpError) -> Self {
630 let details = serde_json::json!({
631 "retryable": e.transience().is_retryable(),
632 "effect": e.effect().as_str(),
633 });
634 ErrorPayload {
635 code: e.code().to_string(),
636 message: e.to_string(),
637 details: Some(details),
638 }
639 }
640}
641
642impl From<IpcError> for ErrorPayload {
643 fn from(e: IpcError) -> Self {
644 let details = serde_json::json!({
645 "retryable": e.transience().is_retryable(),
646 "effect": e.effect().as_str(),
647 });
648 ErrorPayload {
649 code: e.code().to_string(),
650 message: e.to_string(),
651 details: Some(details),
652 }
653 }
654}
655
656#[derive(Error, Debug)]
662#[non_exhaustive]
663pub enum IpcError {
664 #[error("parse error: {0}")]
665 Parse(#[from] serde_json::Error),
666
667 #[error("IO error: {0}")]
668 Io(#[from] std::io::Error),
669
670 #[error(transparent)]
671 InvalidId(#[from] InvalidId),
672
673 #[error("client disconnected")]
674 Disconnected,
675
676 #[error("daemon unavailable: {0}")]
677 DaemonUnavailable(String),
678
679 #[error("daemon version mismatch; restart the daemon and retry")]
680 DaemonVersionMismatch {
681 daemon: Option<crate::api::DaemonInfo>,
682 client_version: String,
683 protocol_version: u32,
684 parse_error: Option<String>,
686 },
687}
688
689impl IpcError {
690 pub fn code(&self) -> &'static str {
691 match self {
692 IpcError::Parse(_) => "parse_error",
693 IpcError::Io(_) => "io_error",
694 IpcError::InvalidId(_) => "invalid_id",
695 IpcError::Disconnected => "disconnected",
696 IpcError::DaemonUnavailable(_) => "daemon_unavailable",
697 IpcError::DaemonVersionMismatch { .. } => "daemon_version_mismatch",
698 }
699 }
700
701 pub fn transience(&self) -> Transience {
703 match self {
704 IpcError::DaemonUnavailable(_) | IpcError::Io(_) | IpcError::Disconnected => {
705 Transience::Retryable
706 }
707 IpcError::DaemonVersionMismatch { .. } => Transience::Retryable,
708 IpcError::Parse(_) | IpcError::InvalidId(_) => Transience::Permanent,
709 }
710 }
711
712 pub fn effect(&self) -> Effect {
714 match self {
715 IpcError::Io(_) | IpcError::Disconnected => Effect::Unknown,
716 IpcError::DaemonUnavailable(_) | IpcError::Parse(_) | IpcError::InvalidId(_) => {
717 Effect::None
718 }
719 IpcError::DaemonVersionMismatch { .. } => Effect::None,
720 }
721 }
722}
723
724fn map_core_invalid_id(e: CoreError) -> IpcError {
725 match e {
726 CoreError::InvalidId(invalid) => IpcError::InvalidId(invalid),
727 other => IpcError::DaemonUnavailable(other.to_string()),
728 }
729}
730
731pub fn encode_response(resp: &Response) -> Result<Vec<u8>, IpcError> {
737 let mut bytes = serde_json::to_vec(resp)?;
738 bytes.push(b'\n');
739 Ok(bytes)
740}
741
742pub fn decode_request(line: &str) -> Result<Request, IpcError> {
744 Ok(serde_json::from_str(line)?)
745}
746
747pub fn send_response(stream: &mut UnixStream, resp: &Response) -> Result<(), IpcError> {
749 let bytes = encode_response(resp)?;
750 stream.write_all(&bytes)?;
751 Ok(())
752}
753
754pub fn read_requests(stream: UnixStream) -> impl Iterator<Item = Result<Request, IpcError>> {
756 let reader = BufReader::new(stream);
757 reader.lines().map(|line| {
758 let line = line?;
759 decode_request(&line)
760 })
761}
762
763pub fn socket_dir() -> PathBuf {
769 socket_dir_candidates()
770 .into_iter()
771 .next()
772 .unwrap_or_else(per_user_tmp_dir)
773}
774
775pub fn ensure_socket_dir() -> Result<PathBuf, IpcError> {
777 let mut last_err: Option<std::io::Error> = None;
778 for dir in socket_dir_candidates() {
779 match fs::create_dir_all(&dir) {
780 Ok(()) => {
781 #[cfg(unix)]
782 {
783 let mode = fs::metadata(&dir)?.permissions().mode() & 0o777;
784 if mode != 0o700 {
785 fs::set_permissions(&dir, fs::Permissions::from_mode(0o700))?;
786 }
787 }
788 return Ok(dir);
789 }
790 Err(e) => last_err = Some(e),
791 }
792 }
793
794 Err(IpcError::Io(last_err.unwrap_or_else(|| {
795 std::io::Error::other("unable to create a writable socket directory")
796 })))
797}
798
799pub fn socket_path() -> PathBuf {
801 ensure_socket_dir()
802 .map(|dir| dir.join("daemon.sock"))
803 .unwrap_or_else(|_| per_user_tmp_dir().join("daemon.sock"))
804}
805
806fn read_daemon_meta() -> Option<crate::api::DaemonInfo> {
809 let dir = ensure_socket_dir().ok()?;
810 let meta_path = dir.join("daemon.meta.json");
811 let contents = fs::read_to_string(&meta_path).ok()?;
812 serde_json::from_str(&contents).ok()
813}
814
815fn per_user_tmp_dir() -> PathBuf {
816 let uid = nix::unistd::geteuid();
817 PathBuf::from("/tmp").join(format!("beads-{}", uid))
818}
819
820fn socket_dir_candidates() -> Vec<PathBuf> {
821 let mut dirs = Vec::new();
822 if let Ok(dir) = std::env::var("XDG_RUNTIME_DIR")
823 && !dir.trim().is_empty()
824 {
825 dirs.push(PathBuf::from(dir).join("beads"));
826 }
827 if let Ok(home) = std::env::var("HOME")
828 && !home.trim().is_empty()
829 {
830 dirs.push(PathBuf::from(home).join(".beads"));
831 }
832 dirs.push(per_user_tmp_dir());
833 dirs
834}
835
836fn should_autostart(err: &std::io::Error) -> bool {
841 matches!(
842 err.kind(),
843 std::io::ErrorKind::NotFound
844 | std::io::ErrorKind::ConnectionRefused
845 | std::io::ErrorKind::ConnectionReset
846 | std::io::ErrorKind::ConnectionAborted
847 )
848}
849
850fn maybe_remove_stale_lock(lock_path: &PathBuf) {
851 if let Ok(meta) = fs::metadata(lock_path)
852 && let Ok(modified) = meta.modified()
853 && let Ok(age) = modified.elapsed()
854 && age > Duration::from_secs(10)
855 {
856 let _ = fs::remove_file(lock_path);
857 }
858}
859
860fn daemon_command() -> Command {
861 if let Ok(exe) = std::env::current_exe() {
862 let mut cmd = Command::new(exe);
863 cmd.arg("daemon").arg("run");
864 return cmd;
865 }
866
867 let mut cmd = Command::new("bd");
868 cmd.arg("daemon").arg("run");
869 cmd
870}
871
872fn connect_with_autostart(socket: &PathBuf) -> Result<UnixStream, IpcError> {
873 match UnixStream::connect(socket) {
874 Ok(stream) => Ok(stream),
875 Err(e) if should_autostart(&e) => {
876 let dir = ensure_socket_dir()?;
878 let lock_path = dir.join("daemon.lock");
879 maybe_remove_stale_lock(&lock_path);
880
881 let mut we_spawned = OpenOptions::new()
882 .write(true)
883 .create_new(true)
884 .open(&lock_path)
885 .is_ok();
886
887 if we_spawned {
888 let mut cmd = daemon_command();
889 cmd.stdin(Stdio::null())
890 .stdout(Stdio::null())
891 .stderr(Stdio::null());
892 cmd.spawn().map_err(|e| {
893 IpcError::DaemonUnavailable(format!("failed to spawn daemon: {}", e))
894 })?;
895 }
896
897 let deadline = SystemTime::now() + Duration::from_secs(30);
898 let mut backoff = Duration::from_millis(50);
899
900 loop {
901 match UnixStream::connect(socket) {
902 Ok(stream) => {
903 if we_spawned {
904 let _ = fs::remove_file(&lock_path);
905 }
906 return Ok(stream);
907 }
908 Err(e) if should_autostart(&e) => {
909 if !we_spawned {
910 maybe_remove_stale_lock(&lock_path);
912 if OpenOptions::new()
913 .write(true)
914 .create_new(true)
915 .open(&lock_path)
916 .is_ok()
917 {
918 we_spawned = true;
919 let mut cmd = daemon_command();
920 cmd.stdin(Stdio::null())
921 .stdout(Stdio::null())
922 .stderr(Stdio::null());
923 if let Err(e) = cmd.spawn() {
924 let _ = fs::remove_file(&lock_path);
925 return Err(IpcError::DaemonUnavailable(format!(
926 "failed to spawn daemon: {}",
927 e
928 )));
929 }
930 }
931 }
932 if SystemTime::now() >= deadline {
933 if we_spawned {
934 let _ = fs::remove_file(&lock_path);
935 }
936 return Err(IpcError::DaemonUnavailable(
937 "timed out waiting for daemon socket".into(),
938 ));
939 }
940 std::thread::sleep(backoff);
941 backoff = std::cmp::min(backoff * 2, Duration::from_millis(200));
942 }
943 Err(e) => {
944 if we_spawned {
945 let _ = fs::remove_file(&lock_path);
946 }
947 return Err(IpcError::Io(e));
948 }
949 }
950 }
951 }
952 Err(e) => Err(IpcError::Io(e)),
953 }
954}
955
956fn write_req_line(stream: &mut UnixStream, req: &Request) -> Result<(), IpcError> {
957 let mut json = serde_json::to_string(req)?;
958 json.push('\n');
959 stream.write_all(json.as_bytes())?;
960 Ok(())
961}
962
963fn read_resp_line(reader: &mut BufReader<UnixStream>) -> Result<Response, IpcError> {
964 let mut line = String::new();
965 let bytes_read = reader.read_line(&mut line)?;
966 if bytes_read == 0 || line.trim().is_empty() {
968 return Err(IpcError::DaemonUnavailable(
969 "daemon not running (stale socket)".into(),
970 ));
971 }
972 Ok(serde_json::from_str(&line)?)
973}
974
975fn read_resp_line_version_check(reader: &mut BufReader<UnixStream>) -> Result<Response, IpcError> {
980 let mut line = String::new();
981 let bytes_read = reader.read_line(&mut line)?;
982 if bytes_read == 0 || line.trim().is_empty() {
983 return Err(IpcError::DaemonUnavailable(
984 "daemon not running (stale socket)".into(),
985 ));
986 }
987 serde_json::from_str(&line).map_err(|e| IpcError::DaemonVersionMismatch {
988 daemon: None,
989 client_version: env!("CARGO_PKG_VERSION").to_string(),
990 protocol_version: IPC_PROTOCOL_VERSION,
991 parse_error: Some(e.to_string()),
992 })
993}
994
995fn verify_daemon_version(
996 writer: &mut UnixStream,
997 reader: &mut BufReader<UnixStream>,
998) -> Result<(), IpcError> {
999 write_req_line(writer, &Request::Ping)?;
1000 let resp = read_resp_line_version_check(reader)?;
1002 let Response::Ok { ok } = resp else {
1003 return Err(IpcError::DaemonVersionMismatch {
1004 daemon: None,
1005 client_version: env!("CARGO_PKG_VERSION").to_string(),
1006 protocol_version: IPC_PROTOCOL_VERSION,
1007 parse_error: None,
1008 });
1009 };
1010
1011 let ResponsePayload::Query(QueryResult::DaemonInfo(info)) = ok else {
1012 return Err(IpcError::DaemonVersionMismatch {
1013 daemon: None,
1014 client_version: env!("CARGO_PKG_VERSION").to_string(),
1015 protocol_version: IPC_PROTOCOL_VERSION,
1016 parse_error: Some("unexpected response payload type".into()),
1017 });
1018 };
1019
1020 if info.protocol_version != IPC_PROTOCOL_VERSION || info.version != env!("CARGO_PKG_VERSION") {
1021 return Err(IpcError::DaemonVersionMismatch {
1022 daemon: Some(info),
1023 client_version: env!("CARGO_PKG_VERSION").to_string(),
1024 protocol_version: IPC_PROTOCOL_VERSION,
1025 parse_error: None,
1026 });
1027 }
1028
1029 Ok(())
1030}
1031
1032fn send_request_over_stream(stream: UnixStream, req: &Request) -> Result<Response, IpcError> {
1033 let mut writer = stream;
1034 let reader_stream = writer.try_clone()?;
1035 let mut reader = BufReader::new(reader_stream);
1036
1037 if !matches!(req, Request::Ping) {
1039 verify_daemon_version(&mut writer, &mut reader)?;
1040 }
1041
1042 write_req_line(&mut writer, req)?;
1043 read_resp_line(&mut reader)
1044}
1045
1046pub fn send_request(req: &Request) -> Result<Response, IpcError> {
1051 const MAX_ATTEMPTS: u32 = 3;
1052 let socket = socket_path();
1053
1054 for attempt in 1..=MAX_ATTEMPTS {
1055 let stream = match connect_with_autostart(&socket) {
1056 Ok(s) => s,
1057 Err(e) => {
1058 if attempt >= MAX_ATTEMPTS {
1059 return Err(e);
1060 }
1061 let backoff = Duration::from_millis(100 * (1 << (attempt - 1)));
1062 std::thread::sleep(backoff);
1063 continue;
1064 }
1065 };
1066
1067 match send_request_over_stream(stream, req) {
1068 Ok(resp) => return Ok(resp),
1069 Err(IpcError::DaemonVersionMismatch { daemon, .. }) if attempt < MAX_ATTEMPTS => {
1070 tracing::info!(
1071 "daemon version mismatch, restarting (attempt {}/{})",
1072 attempt,
1073 MAX_ATTEMPTS
1074 );
1075
1076 if let Some(info) = daemon {
1078 let _ = kill_daemon_forcefully(info.pid, &socket);
1079 } else {
1080 let _ = try_restart_daemon_by_socket(&socket);
1081 }
1082
1083 let backoff = Duration::from_millis(100 * (1 << (attempt - 1)));
1085 std::thread::sleep(backoff);
1086 }
1087 Err(IpcError::DaemonUnavailable(ref msg)) if attempt < MAX_ATTEMPTS => {
1088 tracing::debug!("daemon unavailable ({}), retrying", msg);
1089 let _ = try_restart_daemon_by_socket(&socket);
1091 let backoff = Duration::from_millis(100 * (1 << (attempt - 1)));
1092 std::thread::sleep(backoff);
1093 }
1094 Err(e) => return Err(e),
1095 }
1096 }
1097
1098 Err(IpcError::DaemonUnavailable(
1099 "max retry attempts exceeded".into(),
1100 ))
1101}
1102
1103pub fn send_request_no_autostart(req: &Request) -> Result<Response, IpcError> {
1107 let socket = socket_path();
1108 let stream = UnixStream::connect(&socket)
1109 .map_err(|e| IpcError::DaemonUnavailable(format!("daemon not running: {}", e)))?;
1110 send_request_over_stream(stream, req)
1111}
1112
1113pub fn wait_for_daemon_ready(expected_version: &str) -> Result<(), IpcError> {
1117 let socket = socket_path();
1118 let deadline = SystemTime::now() + Duration::from_secs(30);
1119 let mut backoff = Duration::from_millis(50);
1120
1121 while SystemTime::now() < deadline {
1122 match UnixStream::connect(&socket) {
1123 Ok(stream) => {
1124 let mut writer = stream;
1125 let reader_stream = match writer.try_clone() {
1126 Ok(s) => s,
1127 Err(_) => {
1128 std::thread::sleep(backoff);
1129 backoff = std::cmp::min(backoff * 2, Duration::from_millis(500));
1130 continue;
1131 }
1132 };
1133 let mut reader = BufReader::new(reader_stream);
1134
1135 if write_req_line(&mut writer, &Request::Ping).is_err() {
1136 std::thread::sleep(backoff);
1137 backoff = std::cmp::min(backoff * 2, Duration::from_millis(500));
1138 continue;
1139 }
1140
1141 if let Ok(Response::Ok {
1142 ok: ResponsePayload::Query(QueryResult::DaemonInfo(info)),
1143 }) = read_resp_line(&mut reader)
1144 {
1145 if info.version == expected_version {
1146 tracing::info!("daemon ready with version {}", expected_version);
1147 return Ok(());
1148 }
1149 tracing::debug!(
1151 "daemon has version {}, waiting for {}",
1152 info.version,
1153 expected_version
1154 );
1155 }
1156 std::thread::sleep(backoff);
1157 backoff = std::cmp::min(backoff * 2, Duration::from_millis(500));
1158 }
1159 Err(_) => {
1160 std::thread::sleep(backoff);
1161 backoff = std::cmp::min(backoff * 2, Duration::from_millis(200));
1162 }
1163 }
1164 }
1165
1166 Err(IpcError::DaemonUnavailable(format!(
1167 "timed out waiting for daemon version {}",
1168 expected_version
1169 )))
1170}
1171
1172fn kill_daemon_forcefully(pid: u32, socket: &PathBuf) -> Result<(), IpcError> {
1174 use nix::sys::signal::{Signal, kill};
1175 use nix::unistd::Pid;
1176
1177 let nix_pid = Pid::from_raw(pid as i32);
1178
1179 if let Err(e) = kill(nix_pid, Signal::SIGTERM) {
1181 if e == nix::errno::Errno::ESRCH {
1183 let _ = fs::remove_file(socket);
1184 let _ = fs::remove_file(socket.with_file_name("daemon.meta.json"));
1185 return Ok(());
1186 }
1187 return Err(IpcError::DaemonUnavailable(format!(
1188 "failed to stop daemon pid {pid}: {e}"
1189 )));
1190 }
1191
1192 let deadline = SystemTime::now() + Duration::from_secs(3);
1194 while SystemTime::now() < deadline {
1195 if UnixStream::connect(socket).is_err() {
1196 return Ok(());
1197 }
1198 std::thread::sleep(Duration::from_millis(50));
1199 }
1200
1201 tracing::warn!(
1203 "daemon pid {} did not stop gracefully, sending SIGKILL",
1204 pid
1205 );
1206 if let Err(e) = kill(nix_pid, Signal::SIGKILL)
1207 && e != nix::errno::Errno::ESRCH
1208 {
1209 return Err(IpcError::DaemonUnavailable(format!(
1210 "failed to kill daemon pid {pid}: {e}"
1211 )));
1212 }
1213
1214 let deadline = SystemTime::now() + Duration::from_secs(2);
1216 while SystemTime::now() < deadline {
1217 if UnixStream::connect(socket).is_err() {
1218 return Ok(());
1219 }
1220 std::thread::sleep(Duration::from_millis(50));
1221 }
1222
1223 let _ = fs::remove_file(socket);
1225 let _ = fs::remove_file(socket.with_file_name("daemon.meta.json"));
1226 Ok(())
1227}
1228
1229fn try_restart_daemon_by_socket(socket: &PathBuf) -> Result<(), IpcError> {
1233 if let Some(meta) = read_daemon_meta() {
1235 tracing::debug!("found daemon pid {} from meta file", meta.pid);
1236 return kill_daemon_forcefully(meta.pid, socket);
1237 }
1238
1239 tracing::warn!("no daemon meta file found, removing stale socket");
1241
1242 if let Err(e) = fs::remove_file(socket)
1245 && e.kind() != std::io::ErrorKind::NotFound
1246 {
1247 return Err(IpcError::DaemonUnavailable(format!(
1248 "failed to remove stale socket: {e}"
1249 )));
1250 }
1251
1252 let _ = fs::remove_file(socket.with_file_name("daemon.meta.json"));
1254
1255 Ok(())
1256}
1257
1258#[cfg(test)]
1259mod tests {
1260 use super::*;
1261
1262 #[test]
1263 fn request_roundtrip() {
1264 let req = Request::Create {
1265 repo: PathBuf::from("/test"),
1266 id: None,
1267 parent: None,
1268 title: "test".to_string(),
1269 bead_type: BeadType::Task,
1270 priority: Priority::default(),
1271 description: None,
1272 design: None,
1273 acceptance_criteria: None,
1274 assignee: None,
1275 external_ref: None,
1276 estimated_minutes: None,
1277 labels: Vec::new(),
1278 dependencies: Vec::new(),
1279 };
1280
1281 let json = serde_json::to_string(&req).unwrap();
1282 let parsed: Request = serde_json::from_str(&json).unwrap();
1283
1284 match parsed {
1285 Request::Create { title, .. } => assert_eq!(title, "test"),
1286 _ => panic!("wrong request type"),
1287 }
1288 }
1289
1290 #[test]
1291 fn response_ok() {
1292 let resp = Response::ok(ResponsePayload::synced());
1293 let json = serde_json::to_string(&resp).unwrap();
1294 assert!(json.contains("\"ok\""));
1295 assert!(json.contains("\"result\":\"synced\""));
1297 }
1298
1299 #[test]
1300 fn unit_variants_are_distinguishable() {
1301 let synced = serde_json::to_string(&ResponsePayload::synced()).unwrap();
1303 let initialized = serde_json::to_string(&ResponsePayload::initialized()).unwrap();
1304 let shutting_down = serde_json::to_string(&ResponsePayload::shutting_down()).unwrap();
1305
1306 assert!(synced.contains("\"result\":\"synced\""));
1307 assert!(initialized.contains("\"result\":\"initialized\""));
1308 assert!(shutting_down.contains("\"result\":\"shutting_down\""));
1309
1310 assert!(!synced.contains("null"));
1312 assert!(!initialized.contains("null"));
1313 assert!(!shutting_down.contains("null"));
1314
1315 assert_ne!(synced, initialized);
1317 assert_ne!(synced, shutting_down);
1318 assert_ne!(initialized, shutting_down);
1319 }
1320
1321 #[test]
1322 fn unit_variants_roundtrip() {
1323 let synced_json = serde_json::to_string(&ResponsePayload::synced()).unwrap();
1325 let parsed: ResponsePayload = serde_json::from_str(&synced_json).unwrap();
1326 assert!(matches!(parsed, ResponsePayload::Synced(_)));
1327
1328 let init_json = serde_json::to_string(&ResponsePayload::initialized()).unwrap();
1329 let parsed: ResponsePayload = serde_json::from_str(&init_json).unwrap();
1330 assert!(matches!(parsed, ResponsePayload::Initialized(_)));
1331 }
1332
1333 #[test]
1334 fn response_err() {
1335 let resp = Response::err(ErrorPayload {
1336 code: "not_found".to_string(),
1337 message: "bead not found".to_string(),
1338 details: None,
1339 });
1340 let json = serde_json::to_string(&resp).unwrap();
1341 assert!(json.contains("\"err\""));
1342 assert!(json.contains("not_found"));
1343 }
1344
1345 #[test]
1346 fn ping_info_serializes_as_query() {
1347 let info = crate::api::DaemonInfo {
1348 version: "0.0.0-test".to_string(),
1349 protocol_version: IPC_PROTOCOL_VERSION,
1350 pid: 123,
1351 };
1352 let resp = Response::ok(ResponsePayload::Query(QueryResult::DaemonInfo(info)));
1353 let json = serde_json::to_string(&resp).unwrap();
1354 assert!(json.contains("\"result\":\"daemon_info\""));
1356 assert!(json.contains("\"version\""));
1357 assert!(json.contains("\"protocol_version\""));
1358 assert!(json.contains("\"pid\""));
1359 }
1360
1361 #[test]
1362 fn version_mismatch_error_includes_parse_error() {
1363 let err = IpcError::DaemonVersionMismatch {
1364 daemon: None,
1365 client_version: "0.1.0".into(),
1366 protocol_version: 1,
1367 parse_error: Some("data did not match any variant".into()),
1368 };
1369 assert_eq!(err.code(), "daemon_version_mismatch");
1371 assert!(err.to_string().contains("restart the daemon"));
1373 }
1374
1375 #[test]
1376 fn version_mismatch_is_retryable() {
1377 let err = IpcError::DaemonVersionMismatch {
1378 daemon: None,
1379 client_version: "0.1.0".into(),
1380 protocol_version: 1,
1381 parse_error: None,
1382 };
1383 assert!(err.transience().is_retryable());
1384 }
1385
1386 #[test]
1387 fn invalid_json_would_cause_parse_error() {
1388 let bad_json = r#"{"unexpected": "format"}"#;
1390 let result: Result<Response, _> = serde_json::from_str(bad_json);
1391 assert!(result.is_err());
1392 let err = result.unwrap_err();
1394 assert!(err.to_string().contains("did not match"));
1395 }
1396
1397 mod response_roundtrip {
1399 use super::*;
1400
1401 fn roundtrip_response(resp: Response) {
1402 let json = serde_json::to_string(&resp).unwrap();
1403 let parsed: Response = serde_json::from_str(&json)
1404 .unwrap_or_else(|e| panic!("Failed to parse: {e}\nJSON: {json}"));
1405 let json2 = serde_json::to_string(&parsed).unwrap();
1407 assert_eq!(json, json2, "Roundtrip mismatch");
1408 }
1409
1410 #[test]
1411 fn op_created() {
1412 let resp = Response::ok(ResponsePayload::Op(OpResult::Created {
1413 id: BeadId::parse("bd-abc").unwrap(),
1414 }));
1415 roundtrip_response(resp);
1416 }
1417
1418 #[test]
1419 fn op_updated() {
1420 let resp = Response::ok(ResponsePayload::Op(OpResult::Updated {
1421 id: BeadId::parse("bd-abc").unwrap(),
1422 }));
1423 roundtrip_response(resp);
1424 }
1425
1426 #[test]
1427 fn query_issues_empty() {
1428 let resp = Response::ok(ResponsePayload::Query(QueryResult::Issues(vec![])));
1429 roundtrip_response(resp);
1430 }
1431
1432 #[test]
1433 fn query_daemon_info() {
1434 let info = crate::api::DaemonInfo {
1435 version: "0.1.0".into(),
1436 protocol_version: IPC_PROTOCOL_VERSION,
1437 pid: 12345,
1438 };
1439 let resp = Response::ok(ResponsePayload::Query(QueryResult::DaemonInfo(info)));
1440 roundtrip_response(resp);
1441 }
1442
1443 #[test]
1444 fn synced() {
1445 roundtrip_response(Response::ok(ResponsePayload::synced()));
1446 }
1447
1448 #[test]
1449 fn initialized() {
1450 roundtrip_response(Response::ok(ResponsePayload::initialized()));
1451 }
1452
1453 #[test]
1454 fn shutting_down() {
1455 roundtrip_response(Response::ok(ResponsePayload::shutting_down()));
1456 }
1457
1458 #[test]
1459 fn error_response() {
1460 let resp = Response::err(ErrorPayload {
1461 code: "test_error".into(),
1462 message: "Something went wrong".into(),
1463 details: Some(serde_json::json!({"key": "value"})),
1464 });
1465 roundtrip_response(resp);
1466 }
1467 }
1468}