1use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15
16use libgrite_core::config::repo_sled_path;
17use libgrite_core::store::IssueFilter;
18use libgrite_core::types::ids::{hex_to_id, ActorId};
19use libgrite_core::{GriteError, GriteStore, LockedStore};
20use libgrite_ipc::{DaemonLock, IpcCommand, IpcResponse, Notification};
21use tokio::sync::mpsc;
22use tracing::{debug, error, info, warn};
23
24use crate::error::DaemonError;
25use crate::state::{AtomicWorkerState, WorkerState};
26
27pub enum WorkerMessage {
29 Command {
31 request_id: String,
32 actor_id: String,
34 command: IpcCommand,
35 response_tx: tokio::sync::oneshot::Sender<IpcResponse>,
36 },
37 Heartbeat,
39 Shutdown,
41}
42
43pub struct Worker {
45 pub repo_root: PathBuf,
47 git_dir: PathBuf,
49 grite_dir: PathBuf,
51 sled_path: PathBuf,
53 store: Arc<LockedStore>,
55 rx: mpsc::Receiver<WorkerMessage>,
57 notify_tx: mpsc::Sender<Notification>,
59 host_id: String,
61 ipc_endpoint: String,
63 owner_actor_id: String,
65 pub state: Arc<AtomicWorkerState>,
67}
68
69impl Worker {
70 pub fn new(
72 repo_root: PathBuf,
73 owner_actor_id: String,
74 rx: mpsc::Receiver<WorkerMessage>,
75 notify_tx: mpsc::Sender<Notification>,
76 host_id: String,
77 ipc_endpoint: String,
78 ) -> Result<Self, DaemonError> {
79 let git_dir = repo_root.join(".git");
80 let grite_dir = git_dir.join("grite");
81 let sled_path = repo_sled_path(&git_dir);
82
83 let state = Arc::new(AtomicWorkerState::new(WorkerState::Initializing));
86
87 let store = Arc::new(GriteStore::open_locked_blocking(
88 &sled_path,
89 Duration::from_secs(5),
90 )?);
91
92 state.store(WorkerState::Idle, Ordering::SeqCst);
93
94 Ok(Self {
95 repo_root,
96 git_dir,
97 grite_dir,
98 sled_path,
99 store,
100 rx,
101 notify_tx,
102 host_id,
103 ipc_endpoint,
104 owner_actor_id,
105 state,
106 })
107 }
108
109 pub fn acquire_lock(&self) -> Result<DaemonLock, DaemonError> {
111 DaemonLock::acquire(
112 &self.grite_dir,
113 self.repo_root.to_string_lossy().to_string(),
114 self.owner_actor_id.clone(),
115 self.host_id.clone(),
116 self.ipc_endpoint.clone(),
117 )
118 .map_err(|e| DaemonError::LockFailed(e.to_string()))
119 }
120
121 pub fn refresh_lock(&self) -> Result<(), DaemonError> {
123 if let Ok(Some(mut lock)) = DaemonLock::read(&self.grite_dir) {
124 if lock.is_owned_by_current_process() {
125 lock.refresh();
126 lock.write(&self.grite_dir)?;
127 }
128 }
129 Ok(())
130 }
131
132 pub async fn run(mut self) {
134 info!(
135 repo = %self.repo_root.display(),
136 "Worker started"
137 );
138
139 match self.acquire_lock() {
141 Ok(_lock) => {
142 debug!("Daemon lock acquired");
143 }
144 Err(e) => {
145 error!("Failed to acquire lock: {}", e);
146 return;
147 }
148 }
149
150 let _ = self
152 .notify_tx
153 .send(Notification::WorkerStarted {
154 repo_root: self.repo_root.to_string_lossy().to_string(),
155 actor_id: self.owner_actor_id.clone(),
156 })
157 .await;
158
159 let in_flight = Arc::new(AtomicUsize::new(0));
161 let worker_state = Arc::clone(&self.state);
162
163 while let Some(msg) = self.rx.recv().await {
165 match msg {
166 WorkerMessage::Command {
167 request_id,
168 actor_id,
169 command,
170 response_tx,
171 } => {
172 let actor_id_bytes: ActorId = match hex_to_id(&actor_id) {
174 Ok(b) => b,
175 Err(e) => {
176 let resp = IpcResponse::error(
177 request_id,
178 "invalid_actor".to_string(),
179 format!("Invalid actor ID: {}", e),
180 );
181 let _ = response_tx.send(resp);
182 continue;
183 }
184 };
185
186 let store = Arc::clone(&self.store);
188 let sled_path = self.sled_path.clone();
189 let git_dir = self.git_dir.clone();
190 let in_flight = Arc::clone(&in_flight);
191 let state = Arc::clone(&worker_state);
192
193 let was_idle = in_flight.load(Ordering::SeqCst) == 0;
194 in_flight.fetch_add(1, Ordering::SeqCst);
195 if was_idle {
196 state.store(WorkerState::Busy, Ordering::SeqCst);
197 }
198
199 tokio::task::spawn_blocking(move || {
202 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
203 execute_command(
204 &store,
205 actor_id_bytes,
206 &sled_path,
207 &git_dir,
208 &request_id,
209 &command,
210 )
211 }));
212 let response = match result {
213 Ok(resp) => resp,
214 Err(_) => IpcResponse::error(
215 request_id,
216 "panic".to_string(),
217 "Command handler panicked".to_string(),
218 ),
219 };
220 let _ = response_tx.send(response);
221 let remaining = in_flight.fetch_sub(1, Ordering::SeqCst);
222 if remaining == 1 {
223 state.store(WorkerState::Idle, Ordering::SeqCst);
224 }
225 });
226 }
227 WorkerMessage::Heartbeat => {
228 if let Err(e) = self.refresh_lock() {
229 warn!("Failed to refresh lock: {}", e);
230 }
231 }
232 WorkerMessage::Shutdown => {
233 worker_state.store(WorkerState::ShuttingDown, Ordering::SeqCst);
234 info!("Worker shutdown requested");
235 break;
236 }
237 }
238 }
239
240 let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
242 while in_flight.load(Ordering::SeqCst) > 0 {
243 if tokio::time::Instant::now() >= deadline {
244 warn!(
245 "Timed out waiting for {} in-flight commands",
246 in_flight.load(Ordering::SeqCst)
247 );
248 break;
249 }
250 tokio::time::sleep(Duration::from_millis(50)).await;
251 }
252
253 self.shutdown();
255 self.state.store(WorkerState::Stopped, Ordering::SeqCst);
256 }
257
258 fn shutdown(&self) {
260 if let Err(e) = DaemonLock::release(&self.grite_dir) {
262 warn!("Failed to release lock: {}", e);
263 }
264
265 if let Err(e) = self.store.flush() {
267 warn!("Failed to flush store: {}", e);
268 }
269
270 info!(
271 repo = %self.repo_root.display(),
272 "Worker stopped"
273 );
274 }
275}
276
277fn execute_command(
281 store: &LockedStore,
282 actor_id_bytes: ActorId,
283 sled_path: &Path,
284 git_dir: &Path,
285 request_id: &str,
286 command: &IpcCommand,
287) -> IpcResponse {
288 let result = execute_command_inner(store, actor_id_bytes, sled_path, git_dir, command);
289
290 match result {
291 Ok(data) => IpcResponse::success(request_id.to_string(), data),
292 Err(e) => {
293 let (code, message) = error_to_code_message(&e);
294 IpcResponse::error(request_id.to_string(), code, message)
295 }
296 }
297}
298
299fn execute_command_inner(
301 store: &LockedStore,
302 actor_id_bytes: ActorId,
303 sled_path: &Path,
304 git_dir: &Path,
305 command: &IpcCommand,
306) -> Result<Option<String>, DaemonError> {
307 use libgrite_core::export::{export_json, export_markdown, ExportSince};
308 use libgrite_core::hash::compute_event_id;
309 use libgrite_core::types::event::{Event, EventKind, IssueState};
310 use libgrite_core::types::ids::{generate_issue_id, id_to_hex};
311 use libgrite_core::types::issue::IssueProjection;
312 use libgrite_git::{SyncManager, WalManager};
313
314 let wal = match WalManager::open(git_dir) {
316 Ok(w) => Some(w),
317 Err(e) => {
318 warn!("WAL open failed (sled-only mode): {}", e);
319 None
320 }
321 };
322
323 fn persist_events(
326 store: &LockedStore,
327 wal: Option<&WalManager>,
328 actor_id: &ActorId,
329 events: &[Event],
330 ) -> Result<(), DaemonError> {
331 for event in events {
332 store.insert_event(event)?;
333 }
334 store.flush()?;
335
336 if let Some(w) = wal {
337 if let Err(e) = w.append(actor_id, events) {
338 warn!("Failed to append to WAL: {}", e);
339 }
340 }
341
342 Ok(())
343 }
344
345 match command {
346 IpcCommand::IssueList { state, label } => {
347 let filter = IssueFilter {
348 state: state.as_ref().map(|s| match s.as_str() {
349 "open" => IssueState::Open,
350 "closed" => IssueState::Closed,
351 _ => IssueState::Open,
352 }),
353 label: label.clone(),
354 };
355 let issues = store.list_issues(&filter)?;
356 let summaries: Vec<serde_json::Value> = issues
357 .iter()
358 .map(|s| {
359 serde_json::json!({
360 "issue_id": id_to_hex(&s.issue_id),
361 "title": s.title,
362 "state": format!("{:?}", s.state).to_lowercase(),
363 "labels": s.labels,
364 "assignees": s.assignees,
365 "created_ts": s.created_ts,
366 "updated_ts": s.updated_ts,
367 "comment_count": s.comment_count,
368 })
369 })
370 .collect();
371 let json = serde_json::to_string(&serde_json::json!({ "issues": summaries }))?;
372 Ok(Some(json))
373 }
374
375 IpcCommand::IssueShow { issue_id } => {
376 let id = store
377 .resolve_issue_id(issue_id)
378 .map_err(DaemonError::Core)?;
379 let p = store.get_issue(&id)?.ok_or_else(|| {
380 DaemonError::Core(GriteError::NotFound(format!(
381 "Issue {} not found",
382 issue_id
383 )))
384 })?;
385
386 let json = serde_json::to_string(&projection_to_json(&p))?;
387 Ok(Some(json))
388 }
389
390 IpcCommand::IssueCreate {
391 title,
392 body,
393 labels,
394 } => {
395 let issue_id = generate_issue_id();
396 let ts = current_time_ms();
397 let kind = EventKind::IssueCreated {
398 title: title.clone(),
399 body: body.clone(),
400 labels: labels.clone(),
401 };
402 let event_id = compute_event_id(&issue_id, &actor_id_bytes, ts, None, &kind);
403 let event = Event::new(event_id, issue_id, actor_id_bytes, ts, None, kind);
404
405 persist_events(
406 store,
407 wal.as_ref(),
408 &actor_id_bytes,
409 std::slice::from_ref(&event),
410 )?;
411
412 let projection = IssueProjection::from_event(&event)?;
413 let mut json_val = projection_to_json(&projection);
414 json_val["event_id"] = serde_json::Value::String(id_to_hex(&event_id));
415 json_val["action"] =
416 serde_json::Value::String(libgrite_ipc::issue_action::CREATED.to_string());
417 let json = serde_json::to_string(&json_val)?;
418 Ok(Some(json))
419 }
420
421 IpcCommand::IssueUpdate {
422 issue_id,
423 title,
424 body,
425 } => {
426 if title.is_none() && body.is_none() {
427 return Err(DaemonError::Core(GriteError::InvalidArgs(
428 "At least one of title or body must be provided".to_string(),
429 )));
430 }
431
432 let id = store
433 .resolve_issue_id(issue_id)
434 .map_err(DaemonError::Core)?;
435 store.get_issue(&id)?.ok_or_else(|| {
436 DaemonError::Core(GriteError::NotFound(format!(
437 "Issue {} not found",
438 issue_id
439 )))
440 })?;
441
442 let ts = current_time_ms();
443 let kind = EventKind::IssueUpdated {
444 title: title.clone(),
445 body: body.clone(),
446 };
447 let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
448 let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
449
450 persist_events(
451 store,
452 wal.as_ref(),
453 &actor_id_bytes,
454 std::slice::from_ref(&event),
455 )?;
456
457 let json = serde_json::to_string(&serde_json::json!({
458 "issue_id": issue_id,
459 "event_id": id_to_hex(&event_id),
460 }))?;
461 Ok(Some(json))
462 }
463
464 IpcCommand::IssueComment { issue_id, body } => {
465 let id = store
466 .resolve_issue_id(issue_id)
467 .map_err(DaemonError::Core)?;
468 store.get_issue(&id)?.ok_or_else(|| {
469 DaemonError::Core(GriteError::NotFound(format!(
470 "Issue {} not found",
471 issue_id
472 )))
473 })?;
474
475 let ts = current_time_ms();
476 let kind = EventKind::CommentAdded { body: body.clone() };
477 let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
478 let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
479
480 persist_events(
481 store,
482 wal.as_ref(),
483 &actor_id_bytes,
484 std::slice::from_ref(&event),
485 )?;
486
487 let json = serde_json::to_string(&serde_json::json!({
488 "issue_id": issue_id,
489 "event_id": id_to_hex(&event_id),
490 }))?;
491 Ok(Some(json))
492 }
493
494 IpcCommand::IssueClose { issue_id } => {
495 let id = store
496 .resolve_issue_id(issue_id)
497 .map_err(DaemonError::Core)?;
498 store.get_issue(&id)?.ok_or_else(|| {
499 DaemonError::Core(GriteError::NotFound(format!(
500 "Issue {} not found",
501 issue_id
502 )))
503 })?;
504
505 let ts = current_time_ms();
506 let kind = EventKind::StateChanged {
507 state: IssueState::Closed,
508 };
509 let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
510 let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
511
512 persist_events(
513 store,
514 wal.as_ref(),
515 &actor_id_bytes,
516 std::slice::from_ref(&event),
517 )?;
518
519 let json = serde_json::to_string(&serde_json::json!({
520 "issue_id": issue_id,
521 "event_id": id_to_hex(&event_id),
522 "state": "closed",
523 "action": libgrite_ipc::issue_action::CLOSED,
524 }))?;
525 Ok(Some(json))
526 }
527
528 IpcCommand::IssueReopen { issue_id } => {
529 let id = store
530 .resolve_issue_id(issue_id)
531 .map_err(DaemonError::Core)?;
532 store.get_issue(&id)?.ok_or_else(|| {
533 DaemonError::Core(GriteError::NotFound(format!(
534 "Issue {} not found",
535 issue_id
536 )))
537 })?;
538
539 let ts = current_time_ms();
540 let kind = EventKind::StateChanged {
541 state: IssueState::Open,
542 };
543 let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
544 let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
545
546 persist_events(
547 store,
548 wal.as_ref(),
549 &actor_id_bytes,
550 std::slice::from_ref(&event),
551 )?;
552
553 let json = serde_json::to_string(&serde_json::json!({
554 "issue_id": issue_id,
555 "event_id": id_to_hex(&event_id),
556 "state": "open",
557 "action": libgrite_ipc::issue_action::REOPENED,
558 }))?;
559 Ok(Some(json))
560 }
561
562 IpcCommand::IssueLabel {
563 issue_id,
564 add,
565 remove,
566 } => {
567 let id = store
568 .resolve_issue_id(issue_id)
569 .map_err(DaemonError::Core)?;
570 store.get_issue(&id)?.ok_or_else(|| {
571 DaemonError::Core(GriteError::NotFound(format!(
572 "Issue {} not found",
573 issue_id
574 )))
575 })?;
576
577 let mut event_ids = Vec::new();
578 let mut events = Vec::new();
579 let ts = current_time_ms();
580
581 for label in add {
582 let kind = EventKind::LabelAdded {
583 label: label.clone(),
584 };
585 let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
586 let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
587 event_ids.push(id_to_hex(&event_id));
588 events.push(event);
589 }
590
591 for label in remove {
592 let kind = EventKind::LabelRemoved {
593 label: label.clone(),
594 };
595 let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
596 let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
597 event_ids.push(id_to_hex(&event_id));
598 events.push(event);
599 }
600
601 persist_events(store, wal.as_ref(), &actor_id_bytes, &events)?;
602
603 let json = serde_json::to_string(&serde_json::json!({
604 "issue_id": issue_id,
605 "event_ids": event_ids,
606 }))?;
607 Ok(Some(json))
608 }
609
610 IpcCommand::IssueAssign {
611 issue_id,
612 add,
613 remove,
614 } => {
615 let id = store
616 .resolve_issue_id(issue_id)
617 .map_err(DaemonError::Core)?;
618 store.get_issue(&id)?.ok_or_else(|| {
619 DaemonError::Core(GriteError::NotFound(format!(
620 "Issue {} not found",
621 issue_id
622 )))
623 })?;
624
625 let mut event_ids = Vec::new();
626 let mut events = Vec::new();
627 let ts = current_time_ms();
628
629 for user in add {
630 let kind = EventKind::AssigneeAdded { user: user.clone() };
631 let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
632 let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
633 event_ids.push(id_to_hex(&event_id));
634 events.push(event);
635 }
636
637 for user in remove {
638 let kind = EventKind::AssigneeRemoved { user: user.clone() };
639 let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
640 let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
641 event_ids.push(id_to_hex(&event_id));
642 events.push(event);
643 }
644
645 persist_events(store, wal.as_ref(), &actor_id_bytes, &events)?;
646
647 let json = serde_json::to_string(&serde_json::json!({
648 "issue_id": issue_id,
649 "event_ids": event_ids,
650 }))?;
651 Ok(Some(json))
652 }
653
654 IpcCommand::IssueLink {
655 issue_id,
656 url,
657 note,
658 } => {
659 let id = store
660 .resolve_issue_id(issue_id)
661 .map_err(DaemonError::Core)?;
662 store.get_issue(&id)?.ok_or_else(|| {
663 DaemonError::Core(GriteError::NotFound(format!(
664 "Issue {} not found",
665 issue_id
666 )))
667 })?;
668
669 let ts = current_time_ms();
670 let kind = EventKind::LinkAdded {
671 url: url.clone(),
672 note: note.clone(),
673 };
674 let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
675 let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
676
677 persist_events(
678 store,
679 wal.as_ref(),
680 &actor_id_bytes,
681 std::slice::from_ref(&event),
682 )?;
683
684 let json = serde_json::to_string(&serde_json::json!({
685 "issue_id": issue_id,
686 "event_id": id_to_hex(&event_id),
687 }))?;
688 Ok(Some(json))
689 }
690
691 IpcCommand::IssueAttach {
692 issue_id,
693 file_path,
694 } => {
695 let id = store
696 .resolve_issue_id(issue_id)
697 .map_err(DaemonError::Core)?;
698 store.get_issue(&id)?.ok_or_else(|| {
699 DaemonError::Core(GriteError::NotFound(format!(
700 "Issue {} not found",
701 issue_id
702 )))
703 })?;
704
705 let parts: Vec<&str> = file_path.splitn(3, ':').collect();
706 if parts.len() != 3 {
707 return Err(DaemonError::Core(GriteError::InvalidArgs(
708 "file_path must be in format 'name:sha256:mime'".to_string(),
709 )));
710 }
711
712 let name = parts[0].to_string();
713 let sha256: [u8; 32] = hex_to_id(parts[1])
714 .map_err(|e| DaemonError::Core(GriteError::InvalidArgs(e.to_string())))?;
715 let mime = parts[2].to_string();
716
717 let ts = current_time_ms();
718 let kind = EventKind::AttachmentAdded { name, sha256, mime };
719 let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
720 let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
721
722 persist_events(
723 store,
724 wal.as_ref(),
725 &actor_id_bytes,
726 std::slice::from_ref(&event),
727 )?;
728
729 let json = serde_json::to_string(&serde_json::json!({
730 "issue_id": issue_id,
731 "event_id": id_to_hex(&event_id),
732 }))?;
733 Ok(Some(json))
734 }
735
736 IpcCommand::DbStats => {
737 let stats = store.stats(sled_path)?;
738 let json = serde_json::to_string(&serde_json::json!({
739 "path": stats.path,
740 "size_bytes": stats.size_bytes,
741 "event_count": stats.event_count,
742 "issue_count": stats.issue_count,
743 "last_rebuild_ts": stats.last_rebuild_ts,
744 }))?;
745 Ok(Some(json))
746 }
747
748 IpcCommand::Rebuild => {
749 let stats = store.rebuild()?;
750 let json = serde_json::to_string(&serde_json::json!({
751 "event_count": stats.event_count,
752 "issue_count": stats.issue_count,
753 }))?;
754 Ok(Some(json))
755 }
756
757 IpcCommand::Export { format, since } => {
758 let since_opt = since
759 .as_ref()
760 .and_then(|s| s.parse::<u64>().ok())
761 .map(ExportSince::Timestamp);
762
763 let output = match format.as_str() {
764 "json" => {
765 let export = export_json(store, since_opt)?;
766 serde_json::to_string(&export)?
767 }
768 "md" | "markdown" => export_markdown(store, since_opt)?,
769 _ => {
770 return Err(DaemonError::Core(GriteError::InvalidArgs(format!(
771 "Unknown format: {}",
772 format
773 ))))
774 }
775 };
776 Ok(Some(output))
777 }
778
779 IpcCommand::IssueDepAdd {
780 issue_id,
781 target_id,
782 dep_type,
783 } => {
784 use libgrite_core::hash::compute_event_id;
785 use libgrite_core::types::event::{DependencyType, Event, EventKind};
786 use libgrite_core::types::ids::id_to_hex;
787
788 let id = store
789 .resolve_issue_id(issue_id)
790 .map_err(DaemonError::Core)?;
791 let target = store
792 .resolve_issue_id(target_id)
793 .map_err(DaemonError::Core)?;
794 let dep = DependencyType::from_str(dep_type).ok_or_else(|| {
795 DaemonError::Core(GriteError::InvalidArgs(format!(
796 "Invalid dep type: {}",
797 dep_type
798 )))
799 })?;
800
801 store.get_issue(&id)?.ok_or_else(|| {
802 DaemonError::Core(GriteError::NotFound(format!(
803 "Issue {} not found",
804 issue_id
805 )))
806 })?;
807 store.get_issue(&target)?.ok_or_else(|| {
808 DaemonError::Core(GriteError::NotFound(format!(
809 "Target {} not found",
810 target_id
811 )))
812 })?;
813
814 if store.would_create_cycle(&id, &target, &dep)? {
815 return Err(DaemonError::Core(GriteError::InvalidArgs(format!(
816 "Adding this dependency would create a cycle in the {} graph",
817 dep.as_str()
818 ))));
819 }
820
821 let ts = current_time_ms();
822 let kind = EventKind::DependencyAdded {
823 target,
824 dep_type: dep,
825 };
826 let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
827 let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
828 persist_events(
829 store,
830 wal.as_ref(),
831 &actor_id_bytes,
832 std::slice::from_ref(&event),
833 )?;
834
835 let json = serde_json::to_string(&serde_json::json!({
836 "event_id": id_to_hex(&event_id),
837 "issue_id": issue_id,
838 "target": target_id,
839 "dep_type": dep_type,
840 "action": "added",
841 }))?;
842 Ok(Some(json))
843 }
844
845 IpcCommand::IssueDepRemove {
846 issue_id,
847 target_id,
848 dep_type,
849 } => {
850 use libgrite_core::hash::compute_event_id;
851 use libgrite_core::types::event::{DependencyType, Event, EventKind};
852 use libgrite_core::types::ids::id_to_hex;
853
854 let id = store
855 .resolve_issue_id(issue_id)
856 .map_err(DaemonError::Core)?;
857 let target = store
858 .resolve_issue_id(target_id)
859 .map_err(DaemonError::Core)?;
860 let dep = DependencyType::from_str(dep_type).ok_or_else(|| {
861 DaemonError::Core(GriteError::InvalidArgs(format!(
862 "Invalid dep type: {}",
863 dep_type
864 )))
865 })?;
866
867 let ts = current_time_ms();
868 let kind = EventKind::DependencyRemoved {
869 target,
870 dep_type: dep,
871 };
872 let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
873 let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
874 persist_events(
875 store,
876 wal.as_ref(),
877 &actor_id_bytes,
878 std::slice::from_ref(&event),
879 )?;
880
881 let json = serde_json::to_string(&serde_json::json!({
882 "event_id": id_to_hex(&event_id),
883 "issue_id": issue_id,
884 "target": target_id,
885 "dep_type": dep_type,
886 "action": "removed",
887 }))?;
888 Ok(Some(json))
889 }
890
891 IpcCommand::IssueDepList { issue_id, reverse } => {
892 use libgrite_core::types::ids::id_to_hex;
893
894 let id = store
895 .resolve_issue_id(issue_id)
896 .map_err(DaemonError::Core)?;
897 let deps = if *reverse {
898 store.get_dependents(&id)?
899 } else {
900 store.get_dependencies(&id)?
901 };
902 let dep_list: Vec<serde_json::Value> = deps
903 .iter()
904 .map(|(target, dep_type)| {
905 let title = match store.get_issue(target) {
906 Ok(Some(p)) => p.title.clone(),
907 Ok(None) => "?".to_string(),
908 Err(e) => return Err(DaemonError::Core(e)),
909 };
910 Ok(serde_json::json!({
911 "issue_id": id_to_hex(target),
912 "dep_type": dep_type.as_str(),
913 "title": title,
914 }))
915 })
916 .collect::<Result<Vec<_>, DaemonError>>()?;
917 let json = serde_json::to_string(&serde_json::json!({
918 "issue_id": issue_id,
919 "direction": if *reverse { "dependents" } else { "dependencies" },
920 "deps": dep_list,
921 }))?;
922 Ok(Some(json))
923 }
924
925 IpcCommand::IssueDepTopo { state, label } => {
926 use libgrite_core::types::event::IssueState;
927 use libgrite_core::types::ids::id_to_hex;
928
929 let filter = IssueFilter {
930 state: state.as_deref().map(|s| match s {
931 "closed" => IssueState::Closed,
932 _ => IssueState::Open,
933 }),
934 label: label.clone(),
935 };
936 let sorted = store.topological_order(&filter)?;
937 let issues: Vec<serde_json::Value> = sorted
938 .iter()
939 .map(|s| {
940 serde_json::json!({
941 "issue_id": id_to_hex(&s.issue_id),
942 "title": s.title,
943 "state": format!("{:?}", s.state).to_lowercase(),
944 "labels": s.labels,
945 })
946 })
947 .collect();
948 let json = serde_json::to_string(&serde_json::json!({
949 "issues": issues,
950 "order": "topological",
951 }))?;
952 Ok(Some(json))
953 }
954
955 IpcCommand::DaemonStatus | IpcCommand::DaemonStop => Err(DaemonError::Core(
958 GriteError::Internal("supervisor-only command received by worker".to_string()),
959 )),
960
961 IpcCommand::Sync { remote, pull, push } => {
962 let sync_mgr = SyncManager::open(git_dir)?;
963
964 let do_pull = *pull || !*push;
966 let do_push = *push || !*pull;
967
968 if do_push {
970 if let Some(w) = wal.as_ref() {
971 if w.head().unwrap_or(None).is_none() {
972 let events = store.get_all_events().unwrap_or_default();
973 if !events.is_empty() {
974 let mut sorted = events;
975 sorted.sort_by_key(|e| e.ts_unix_ms);
976 match w.append(&actor_id_bytes, &sorted) {
977 Ok(_) => info!("Auto-backfilled WAL with {} events", sorted.len()),
978 Err(e) => warn!("WAL backfill failed: {}", e),
979 }
980 }
981 }
982 }
983 }
984
985 let result = if do_pull && !do_push {
986 let pull_result = sync_mgr.pull(remote)?;
988 let wal_head: Option<String> = pull_result.new_wal_head.map(|oid| oid.to_string());
989 serde_json::json!({
990 "pulled": true,
991 "pushed": false,
992 "pull_events": pull_result.events_pulled,
993 "pull_wal_head": wal_head,
994 "message": pull_result.message,
995 })
996 } else if do_push && !do_pull {
997 let push_result = sync_mgr.push_with_rebase(remote, &actor_id_bytes)?;
999 serde_json::json!({
1000 "pulled": false,
1001 "pushed": true,
1002 "push_success": push_result.success,
1003 "push_rebased": push_result.rebased,
1004 "push_events_rebased": push_result.events_rebased,
1005 "message": push_result.message,
1006 })
1007 } else {
1008 let (pull_result, push_result) =
1010 sync_mgr.sync_with_rebase(remote, &actor_id_bytes)?;
1011 let wal_head: Option<String> = pull_result.new_wal_head.map(|oid| oid.to_string());
1012 serde_json::json!({
1013 "pulled": true,
1014 "pushed": true,
1015 "pull_events": pull_result.events_pulled,
1016 "pull_wal_head": wal_head,
1017 "push_success": push_result.success,
1018 "push_rebased": push_result.rebased,
1019 "push_events_rebased": push_result.events_rebased,
1020 "message": format!("{} / {}", pull_result.message, push_result.message),
1021 })
1022 };
1023
1024 Ok(Some(result.to_string()))
1025 }
1026
1027 IpcCommand::SnapshotCreate | IpcCommand::SnapshotList | IpcCommand::SnapshotGc { .. } => {
1028 Err(DaemonError::Core(GriteError::Internal(
1029 "Snapshot through daemon not yet implemented - use --no-daemon".to_string(),
1030 )))
1031 }
1032 }
1033}
1034
1035fn projection_to_json(p: &libgrite_core::types::issue::IssueProjection) -> serde_json::Value {
1037 use libgrite_core::types::ids::id_to_hex;
1038
1039 let comments: Vec<serde_json::Value> = p
1040 .comments
1041 .iter()
1042 .map(|c| {
1043 serde_json::json!({
1044 "event_id": id_to_hex(&c.event_id),
1045 "actor": id_to_hex(&c.actor),
1046 "ts_unix_ms": c.ts_unix_ms,
1047 "body": c.body,
1048 })
1049 })
1050 .collect();
1051 let links: Vec<serde_json::Value> = p
1052 .links
1053 .iter()
1054 .map(|l| {
1055 serde_json::json!({
1056 "event_id": id_to_hex(&l.event_id),
1057 "url": l.url,
1058 "note": l.note,
1059 })
1060 })
1061 .collect();
1062 let attachments: Vec<serde_json::Value> = p
1063 .attachments
1064 .iter()
1065 .map(|a| {
1066 serde_json::json!({
1067 "event_id": id_to_hex(&a.event_id),
1068 "name": a.name,
1069 "sha256": hex::encode(a.sha256),
1070 "mime": a.mime,
1071 })
1072 })
1073 .collect();
1074 let deps: Vec<serde_json::Value> = p
1075 .dependencies
1076 .iter()
1077 .map(|d| {
1078 serde_json::json!({
1079 "target": id_to_hex(&d.target),
1080 "dep_type": d.dep_type.as_str(),
1081 })
1082 })
1083 .collect();
1084
1085 serde_json::json!({
1086 "issue_id": id_to_hex(&p.issue_id),
1087 "title": p.title,
1088 "body": p.body,
1089 "state": format!("{:?}", p.state).to_lowercase(),
1090 "labels": p.labels,
1091 "assignees": p.assignees,
1092 "comments": comments,
1093 "links": links,
1094 "attachments": attachments,
1095 "dependencies": deps,
1096 "created_ts": p.created_ts,
1097 "updated_ts": p.updated_ts,
1098 })
1099}
1100
1101fn current_time_ms() -> u64 {
1103 std::time::SystemTime::now()
1104 .duration_since(std::time::UNIX_EPOCH)
1105 .unwrap_or_default()
1106 .as_millis() as u64
1107}
1108
1109fn error_to_code_message(e: &DaemonError) -> (String, String) {
1111 use libgrite_ipc::error::codes;
1112
1113 match e {
1114 DaemonError::Core(GriteError::NotFound(_)) => (codes::NOT_FOUND.to_string(), e.to_string()),
1115 DaemonError::Core(GriteError::InvalidArgs(_)) => {
1116 (codes::INVALID_INPUT.to_string(), e.to_string())
1117 }
1118 DaemonError::Core(GriteError::Io(_)) => (codes::IO_ERROR.to_string(), e.to_string()),
1119 DaemonError::Git(_) => (codes::GIT_ERROR.to_string(), e.to_string()),
1120 DaemonError::Ipc(_) => (codes::IPC_ERROR.to_string(), e.to_string()),
1121 _ => (codes::INTERNAL.to_string(), e.to_string()),
1122 }
1123}