Skip to main content

grite_daemon/
worker.rs

1//! Worker module - handles commands for a single repo.
2//!
3//! Each worker owns exclusive access to the shared sled database for its
4//! repository. Commands are processed concurrently using tokio tasks, with
5//! sled's internal MVCC handling concurrent access safely.
6//!
7//! Actor ID is supplied per-command rather than being fixed at worker
8//! creation time, reflecting the shared-sled model where actor identity
9//! is authorship metadata rather than a storage partition.
10
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15
16use libgrite_core::config::repo_sled_path;
17use libgrite_core::store::IssueFilter;
18use libgrite_core::types::ids::{hex_to_id, ActorId};
19use libgrite_core::{GriteError, GriteStore, LockedStore};
20use libgrite_ipc::{DaemonLock, IpcCommand, IpcResponse, Notification};
21use tokio::sync::mpsc;
22use tracing::{debug, error, info, warn};
23
24use crate::error::DaemonError;
25use crate::state::{AtomicWorkerState, WorkerState};
26
27/// Message sent to a worker
28pub enum WorkerMessage {
29    /// Execute a command
30    Command {
31        request_id: String,
32        /// Actor ID (hex) for event authorship
33        actor_id: String,
34        command: IpcCommand,
35        response_tx: tokio::sync::oneshot::Sender<IpcResponse>,
36    },
37    /// Refresh the heartbeat
38    Heartbeat,
39    /// Shutdown the worker
40    Shutdown,
41}
42
43/// Worker state for a single repository
44pub struct Worker {
45    /// Repository root path
46    pub repo_root: PathBuf,
47    /// Git directory (.git or worktree commondir)
48    git_dir: PathBuf,
49    /// Grite data directory (.git/grite) — used for daemon lock
50    grite_dir: PathBuf,
51    /// Sled store path
52    sled_path: PathBuf,
53    /// Sled store with filesystem lock (shared for concurrent access)
54    store: Arc<LockedStore>,
55    /// Channel for receiving messages
56    rx: mpsc::Receiver<WorkerMessage>,
57    /// Notification sender
58    notify_tx: mpsc::Sender<Notification>,
59    /// Host ID for this daemon
60    host_id: String,
61    /// IPC endpoint
62    ipc_endpoint: String,
63    /// Owner actor ID used when acquiring the daemon lock
64    owner_actor_id: String,
65    /// Current lifecycle state
66    pub state: Arc<AtomicWorkerState>,
67}
68
69impl Worker {
70    /// Create a new worker
71    pub fn new(
72        repo_root: PathBuf,
73        owner_actor_id: String,
74        rx: mpsc::Receiver<WorkerMessage>,
75        notify_tx: mpsc::Sender<Notification>,
76        host_id: String,
77        ipc_endpoint: String,
78    ) -> Result<Self, DaemonError> {
79        let git_dir = repo_root.join(".git");
80        let grite_dir = git_dir.join("grite");
81        let sled_path = repo_sled_path(&git_dir);
82
83        // Open store with filesystem lock (blocking with timeout)
84        // This ensures exclusive process-level access to the sled database
85        let state = Arc::new(AtomicWorkerState::new(WorkerState::Initializing));
86
87        let store = Arc::new(GriteStore::open_locked_blocking(
88            &sled_path,
89            Duration::from_secs(5),
90        )?);
91
92        state.store(WorkerState::Idle, Ordering::SeqCst);
93
94        Ok(Self {
95            repo_root,
96            git_dir,
97            grite_dir,
98            sled_path,
99            store,
100            rx,
101            notify_tx,
102            host_id,
103            ipc_endpoint,
104            owner_actor_id,
105            state,
106        })
107    }
108
109    /// Acquire the daemon lock
110    pub fn acquire_lock(&self) -> Result<DaemonLock, DaemonError> {
111        DaemonLock::acquire(
112            &self.grite_dir,
113            self.repo_root.to_string_lossy().to_string(),
114            self.owner_actor_id.clone(),
115            self.host_id.clone(),
116            self.ipc_endpoint.clone(),
117        )
118        .map_err(|e| DaemonError::LockFailed(e.to_string()))
119    }
120
121    /// Refresh the daemon lock heartbeat
122    pub fn refresh_lock(&self) -> Result<(), DaemonError> {
123        if let Ok(Some(mut lock)) = DaemonLock::read(&self.grite_dir) {
124            if lock.is_owned_by_current_process() {
125                lock.refresh();
126                lock.write(&self.grite_dir)?;
127            }
128        }
129        Ok(())
130    }
131
132    /// Run the worker event loop
133    pub async fn run(mut self) {
134        info!(
135            repo = %self.repo_root.display(),
136            "Worker started"
137        );
138
139        // Acquire lock
140        match self.acquire_lock() {
141            Ok(_lock) => {
142                debug!("Daemon lock acquired");
143            }
144            Err(e) => {
145                error!("Failed to acquire lock: {}", e);
146                return;
147            }
148        }
149
150        // Notify worker started
151        let _ = self
152            .notify_tx
153            .send(Notification::WorkerStarted {
154                repo_root: self.repo_root.to_string_lossy().to_string(),
155                actor_id: self.owner_actor_id.clone(),
156            })
157            .await;
158
159        // Track in-flight commands so we can wait for them on shutdown
160        let in_flight = Arc::new(AtomicUsize::new(0));
161        let worker_state = Arc::clone(&self.state);
162
163        // Event loop - commands are spawned as concurrent tasks
164        while let Some(msg) = self.rx.recv().await {
165            match msg {
166                WorkerMessage::Command {
167                    request_id,
168                    actor_id,
169                    command,
170                    response_tx,
171                } => {
172                    // Parse actor ID bytes for event authorship
173                    let actor_id_bytes: ActorId = match hex_to_id(&actor_id) {
174                        Ok(b) => b,
175                        Err(e) => {
176                            let resp = IpcResponse::error(
177                                request_id,
178                                "invalid_actor".to_string(),
179                                format!("Invalid actor ID: {}", e),
180                            );
181                            let _ = response_tx.send(resp);
182                            continue;
183                        }
184                    };
185
186                    // Clone data needed for the spawned task
187                    let store = Arc::clone(&self.store);
188                    let sled_path = self.sled_path.clone();
189                    let git_dir = self.git_dir.clone();
190                    let in_flight = Arc::clone(&in_flight);
191                    let state = Arc::clone(&worker_state);
192
193                    let was_idle = in_flight.load(Ordering::SeqCst) == 0;
194                    in_flight.fetch_add(1, Ordering::SeqCst);
195                    if was_idle {
196                        state.store(WorkerState::Busy, Ordering::SeqCst);
197                    }
198
199                    // Run on the blocking thread pool — sled and git2 do
200                    // synchronous I/O that must not starve the async runtime.
201                    tokio::task::spawn_blocking(move || {
202                        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
203                            execute_command(
204                                &store,
205                                actor_id_bytes,
206                                &sled_path,
207                                &git_dir,
208                                &request_id,
209                                &command,
210                            )
211                        }));
212                        let response = match result {
213                            Ok(resp) => resp,
214                            Err(_) => IpcResponse::error(
215                                request_id,
216                                "panic".to_string(),
217                                "Command handler panicked".to_string(),
218                            ),
219                        };
220                        let _ = response_tx.send(response);
221                        let remaining = in_flight.fetch_sub(1, Ordering::SeqCst);
222                        if remaining == 1 {
223                            state.store(WorkerState::Idle, Ordering::SeqCst);
224                        }
225                    });
226                }
227                WorkerMessage::Heartbeat => {
228                    if let Err(e) = self.refresh_lock() {
229                        warn!("Failed to refresh lock: {}", e);
230                    }
231                }
232                WorkerMessage::Shutdown => {
233                    worker_state.store(WorkerState::ShuttingDown, Ordering::SeqCst);
234                    info!("Worker shutdown requested");
235                    break;
236                }
237            }
238        }
239
240        // Wait for in-flight commands to complete (with timeout)
241        let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
242        while in_flight.load(Ordering::SeqCst) > 0 {
243            if tokio::time::Instant::now() >= deadline {
244                warn!(
245                    "Timed out waiting for {} in-flight commands",
246                    in_flight.load(Ordering::SeqCst)
247                );
248                break;
249            }
250            tokio::time::sleep(Duration::from_millis(50)).await;
251        }
252
253        // Cleanup
254        self.shutdown();
255        self.state.store(WorkerState::Stopped, Ordering::SeqCst);
256    }
257
258    /// Shutdown cleanup
259    fn shutdown(&self) {
260        // Release lock
261        if let Err(e) = DaemonLock::release(&self.grite_dir) {
262            warn!("Failed to release lock: {}", e);
263        }
264
265        // Flush store
266        if let Err(e) = self.store.flush() {
267            warn!("Failed to flush store: {}", e);
268        }
269
270        info!(
271            repo = %self.repo_root.display(),
272            "Worker stopped"
273        );
274    }
275}
276
277/// Execute a command with the given context.
278///
279/// This is a standalone function to enable concurrent execution via tokio::spawn.
280fn execute_command(
281    store: &LockedStore,
282    actor_id_bytes: ActorId,
283    sled_path: &Path,
284    git_dir: &Path,
285    request_id: &str,
286    command: &IpcCommand,
287) -> IpcResponse {
288    let result = execute_command_inner(store, actor_id_bytes, sled_path, git_dir, command);
289
290    match result {
291        Ok(data) => IpcResponse::success(request_id.to_string(), data),
292        Err(e) => {
293            let (code, message) = error_to_code_message(&e);
294            IpcResponse::error(request_id.to_string(), code, message)
295        }
296    }
297}
298
299/// Inner command execution logic
300fn execute_command_inner(
301    store: &LockedStore,
302    actor_id_bytes: ActorId,
303    sled_path: &Path,
304    git_dir: &Path,
305    command: &IpcCommand,
306) -> Result<Option<String>, DaemonError> {
307    use libgrite_core::export::{export_json, export_markdown, ExportSince};
308    use libgrite_core::hash::compute_event_id;
309    use libgrite_core::types::event::{Event, EventKind, IssueState};
310    use libgrite_core::types::ids::{generate_issue_id, id_to_hex};
311    use libgrite_core::types::issue::IssueProjection;
312    use libgrite_git::{SyncManager, WalManager};
313
314    // Open WAL (best-effort — sled operations work without it)
315    let wal = match WalManager::open(git_dir) {
316        Ok(w) => Some(w),
317        Err(e) => {
318            warn!("WAL open failed (sled-only mode): {}", e);
319            None
320        }
321    };
322
323    /// Persist events to both sled store and WAL.
324    /// WAL append is best-effort — failures are logged but don't fail the operation.
325    fn persist_events(
326        store: &LockedStore,
327        wal: Option<&WalManager>,
328        actor_id: &ActorId,
329        events: &[Event],
330    ) -> Result<(), DaemonError> {
331        for event in events {
332            store.insert_event(event)?;
333        }
334        store.flush()?;
335
336        if let Some(w) = wal {
337            if let Err(e) = w.append(actor_id, events) {
338                warn!("Failed to append to WAL: {}", e);
339            }
340        }
341
342        Ok(())
343    }
344
345    match command {
346        IpcCommand::IssueList { state, label } => {
347            let filter = IssueFilter {
348                state: state.as_ref().map(|s| match s.as_str() {
349                    "open" => IssueState::Open,
350                    "closed" => IssueState::Closed,
351                    _ => IssueState::Open,
352                }),
353                label: label.clone(),
354            };
355            let issues = store.list_issues(&filter)?;
356            let summaries: Vec<serde_json::Value> = issues
357                .iter()
358                .map(|s| {
359                    serde_json::json!({
360                        "issue_id": id_to_hex(&s.issue_id),
361                        "title": s.title,
362                        "state": format!("{:?}", s.state).to_lowercase(),
363                        "labels": s.labels,
364                        "assignees": s.assignees,
365                        "created_ts": s.created_ts,
366                        "updated_ts": s.updated_ts,
367                        "comment_count": s.comment_count,
368                    })
369                })
370                .collect();
371            let json = serde_json::to_string(&serde_json::json!({ "issues": summaries }))?;
372            Ok(Some(json))
373        }
374
375        IpcCommand::IssueShow { issue_id } => {
376            let id = store
377                .resolve_issue_id(issue_id)
378                .map_err(DaemonError::Core)?;
379            let p = store.get_issue(&id)?.ok_or_else(|| {
380                DaemonError::Core(GriteError::NotFound(format!(
381                    "Issue {} not found",
382                    issue_id
383                )))
384            })?;
385
386            let json = serde_json::to_string(&projection_to_json(&p))?;
387            Ok(Some(json))
388        }
389
390        IpcCommand::IssueCreate {
391            title,
392            body,
393            labels,
394        } => {
395            let issue_id = generate_issue_id();
396            let ts = current_time_ms();
397            let kind = EventKind::IssueCreated {
398                title: title.clone(),
399                body: body.clone(),
400                labels: labels.clone(),
401            };
402            let event_id = compute_event_id(&issue_id, &actor_id_bytes, ts, None, &kind);
403            let event = Event::new(event_id, issue_id, actor_id_bytes, ts, None, kind);
404
405            persist_events(
406                store,
407                wal.as_ref(),
408                &actor_id_bytes,
409                std::slice::from_ref(&event),
410            )?;
411
412            let projection = IssueProjection::from_event(&event)?;
413            let mut json_val = projection_to_json(&projection);
414            json_val["event_id"] = serde_json::Value::String(id_to_hex(&event_id));
415            json_val["action"] =
416                serde_json::Value::String(libgrite_ipc::issue_action::CREATED.to_string());
417            let json = serde_json::to_string(&json_val)?;
418            Ok(Some(json))
419        }
420
421        IpcCommand::IssueUpdate {
422            issue_id,
423            title,
424            body,
425        } => {
426            if title.is_none() && body.is_none() {
427                return Err(DaemonError::Core(GriteError::InvalidArgs(
428                    "At least one of title or body must be provided".to_string(),
429                )));
430            }
431
432            let id = store
433                .resolve_issue_id(issue_id)
434                .map_err(DaemonError::Core)?;
435            store.get_issue(&id)?.ok_or_else(|| {
436                DaemonError::Core(GriteError::NotFound(format!(
437                    "Issue {} not found",
438                    issue_id
439                )))
440            })?;
441
442            let ts = current_time_ms();
443            let kind = EventKind::IssueUpdated {
444                title: title.clone(),
445                body: body.clone(),
446            };
447            let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
448            let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
449
450            persist_events(
451                store,
452                wal.as_ref(),
453                &actor_id_bytes,
454                std::slice::from_ref(&event),
455            )?;
456
457            let json = serde_json::to_string(&serde_json::json!({
458                "issue_id": issue_id,
459                "event_id": id_to_hex(&event_id),
460            }))?;
461            Ok(Some(json))
462        }
463
464        IpcCommand::IssueComment { issue_id, body } => {
465            let id = store
466                .resolve_issue_id(issue_id)
467                .map_err(DaemonError::Core)?;
468            store.get_issue(&id)?.ok_or_else(|| {
469                DaemonError::Core(GriteError::NotFound(format!(
470                    "Issue {} not found",
471                    issue_id
472                )))
473            })?;
474
475            let ts = current_time_ms();
476            let kind = EventKind::CommentAdded { body: body.clone() };
477            let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
478            let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
479
480            persist_events(
481                store,
482                wal.as_ref(),
483                &actor_id_bytes,
484                std::slice::from_ref(&event),
485            )?;
486
487            let json = serde_json::to_string(&serde_json::json!({
488                "issue_id": issue_id,
489                "event_id": id_to_hex(&event_id),
490            }))?;
491            Ok(Some(json))
492        }
493
494        IpcCommand::IssueClose { issue_id } => {
495            let id = store
496                .resolve_issue_id(issue_id)
497                .map_err(DaemonError::Core)?;
498            store.get_issue(&id)?.ok_or_else(|| {
499                DaemonError::Core(GriteError::NotFound(format!(
500                    "Issue {} not found",
501                    issue_id
502                )))
503            })?;
504
505            let ts = current_time_ms();
506            let kind = EventKind::StateChanged {
507                state: IssueState::Closed,
508            };
509            let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
510            let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
511
512            persist_events(
513                store,
514                wal.as_ref(),
515                &actor_id_bytes,
516                std::slice::from_ref(&event),
517            )?;
518
519            let json = serde_json::to_string(&serde_json::json!({
520                "issue_id": issue_id,
521                "event_id": id_to_hex(&event_id),
522                "state": "closed",
523                "action": libgrite_ipc::issue_action::CLOSED,
524            }))?;
525            Ok(Some(json))
526        }
527
528        IpcCommand::IssueReopen { issue_id } => {
529            let id = store
530                .resolve_issue_id(issue_id)
531                .map_err(DaemonError::Core)?;
532            store.get_issue(&id)?.ok_or_else(|| {
533                DaemonError::Core(GriteError::NotFound(format!(
534                    "Issue {} not found",
535                    issue_id
536                )))
537            })?;
538
539            let ts = current_time_ms();
540            let kind = EventKind::StateChanged {
541                state: IssueState::Open,
542            };
543            let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
544            let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
545
546            persist_events(
547                store,
548                wal.as_ref(),
549                &actor_id_bytes,
550                std::slice::from_ref(&event),
551            )?;
552
553            let json = serde_json::to_string(&serde_json::json!({
554                "issue_id": issue_id,
555                "event_id": id_to_hex(&event_id),
556                "state": "open",
557                "action": libgrite_ipc::issue_action::REOPENED,
558            }))?;
559            Ok(Some(json))
560        }
561
562        IpcCommand::IssueLabel {
563            issue_id,
564            add,
565            remove,
566        } => {
567            let id = store
568                .resolve_issue_id(issue_id)
569                .map_err(DaemonError::Core)?;
570            store.get_issue(&id)?.ok_or_else(|| {
571                DaemonError::Core(GriteError::NotFound(format!(
572                    "Issue {} not found",
573                    issue_id
574                )))
575            })?;
576
577            let mut event_ids = Vec::new();
578            let mut events = Vec::new();
579            let ts = current_time_ms();
580
581            for label in add {
582                let kind = EventKind::LabelAdded {
583                    label: label.clone(),
584                };
585                let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
586                let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
587                event_ids.push(id_to_hex(&event_id));
588                events.push(event);
589            }
590
591            for label in remove {
592                let kind = EventKind::LabelRemoved {
593                    label: label.clone(),
594                };
595                let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
596                let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
597                event_ids.push(id_to_hex(&event_id));
598                events.push(event);
599            }
600
601            persist_events(store, wal.as_ref(), &actor_id_bytes, &events)?;
602
603            let json = serde_json::to_string(&serde_json::json!({
604                "issue_id": issue_id,
605                "event_ids": event_ids,
606            }))?;
607            Ok(Some(json))
608        }
609
610        IpcCommand::IssueAssign {
611            issue_id,
612            add,
613            remove,
614        } => {
615            let id = store
616                .resolve_issue_id(issue_id)
617                .map_err(DaemonError::Core)?;
618            store.get_issue(&id)?.ok_or_else(|| {
619                DaemonError::Core(GriteError::NotFound(format!(
620                    "Issue {} not found",
621                    issue_id
622                )))
623            })?;
624
625            let mut event_ids = Vec::new();
626            let mut events = Vec::new();
627            let ts = current_time_ms();
628
629            for user in add {
630                let kind = EventKind::AssigneeAdded { user: user.clone() };
631                let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
632                let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
633                event_ids.push(id_to_hex(&event_id));
634                events.push(event);
635            }
636
637            for user in remove {
638                let kind = EventKind::AssigneeRemoved { user: user.clone() };
639                let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
640                let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
641                event_ids.push(id_to_hex(&event_id));
642                events.push(event);
643            }
644
645            persist_events(store, wal.as_ref(), &actor_id_bytes, &events)?;
646
647            let json = serde_json::to_string(&serde_json::json!({
648                "issue_id": issue_id,
649                "event_ids": event_ids,
650            }))?;
651            Ok(Some(json))
652        }
653
654        IpcCommand::IssueLink {
655            issue_id,
656            url,
657            note,
658        } => {
659            let id = store
660                .resolve_issue_id(issue_id)
661                .map_err(DaemonError::Core)?;
662            store.get_issue(&id)?.ok_or_else(|| {
663                DaemonError::Core(GriteError::NotFound(format!(
664                    "Issue {} not found",
665                    issue_id
666                )))
667            })?;
668
669            let ts = current_time_ms();
670            let kind = EventKind::LinkAdded {
671                url: url.clone(),
672                note: note.clone(),
673            };
674            let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
675            let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
676
677            persist_events(
678                store,
679                wal.as_ref(),
680                &actor_id_bytes,
681                std::slice::from_ref(&event),
682            )?;
683
684            let json = serde_json::to_string(&serde_json::json!({
685                "issue_id": issue_id,
686                "event_id": id_to_hex(&event_id),
687            }))?;
688            Ok(Some(json))
689        }
690
691        IpcCommand::IssueAttach {
692            issue_id,
693            file_path,
694        } => {
695            let id = store
696                .resolve_issue_id(issue_id)
697                .map_err(DaemonError::Core)?;
698            store.get_issue(&id)?.ok_or_else(|| {
699                DaemonError::Core(GriteError::NotFound(format!(
700                    "Issue {} not found",
701                    issue_id
702                )))
703            })?;
704
705            let parts: Vec<&str> = file_path.splitn(3, ':').collect();
706            if parts.len() != 3 {
707                return Err(DaemonError::Core(GriteError::InvalidArgs(
708                    "file_path must be in format 'name:sha256:mime'".to_string(),
709                )));
710            }
711
712            let name = parts[0].to_string();
713            let sha256: [u8; 32] = hex_to_id(parts[1])
714                .map_err(|e| DaemonError::Core(GriteError::InvalidArgs(e.to_string())))?;
715            let mime = parts[2].to_string();
716
717            let ts = current_time_ms();
718            let kind = EventKind::AttachmentAdded { name, sha256, mime };
719            let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
720            let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
721
722            persist_events(
723                store,
724                wal.as_ref(),
725                &actor_id_bytes,
726                std::slice::from_ref(&event),
727            )?;
728
729            let json = serde_json::to_string(&serde_json::json!({
730                "issue_id": issue_id,
731                "event_id": id_to_hex(&event_id),
732            }))?;
733            Ok(Some(json))
734        }
735
736        IpcCommand::DbStats => {
737            let stats = store.stats(sled_path)?;
738            let json = serde_json::to_string(&serde_json::json!({
739                "path": stats.path,
740                "size_bytes": stats.size_bytes,
741                "event_count": stats.event_count,
742                "issue_count": stats.issue_count,
743                "last_rebuild_ts": stats.last_rebuild_ts,
744            }))?;
745            Ok(Some(json))
746        }
747
748        IpcCommand::Rebuild => {
749            let stats = store.rebuild()?;
750            let json = serde_json::to_string(&serde_json::json!({
751                "event_count": stats.event_count,
752                "issue_count": stats.issue_count,
753            }))?;
754            Ok(Some(json))
755        }
756
757        IpcCommand::Export { format, since } => {
758            let since_opt = since
759                .as_ref()
760                .and_then(|s| s.parse::<u64>().ok())
761                .map(ExportSince::Timestamp);
762
763            let output = match format.as_str() {
764                "json" => {
765                    let export = export_json(store, since_opt)?;
766                    serde_json::to_string(&export)?
767                }
768                "md" | "markdown" => export_markdown(store, since_opt)?,
769                _ => {
770                    return Err(DaemonError::Core(GriteError::InvalidArgs(format!(
771                        "Unknown format: {}",
772                        format
773                    ))))
774                }
775            };
776            Ok(Some(output))
777        }
778
779        IpcCommand::IssueDepAdd {
780            issue_id,
781            target_id,
782            dep_type,
783        } => {
784            use libgrite_core::hash::compute_event_id;
785            use libgrite_core::types::event::{DependencyType, Event, EventKind};
786            use libgrite_core::types::ids::id_to_hex;
787
788            let id = store
789                .resolve_issue_id(issue_id)
790                .map_err(DaemonError::Core)?;
791            let target = store
792                .resolve_issue_id(target_id)
793                .map_err(DaemonError::Core)?;
794            let dep = DependencyType::from_str(dep_type).ok_or_else(|| {
795                DaemonError::Core(GriteError::InvalidArgs(format!(
796                    "Invalid dep type: {}",
797                    dep_type
798                )))
799            })?;
800
801            store.get_issue(&id)?.ok_or_else(|| {
802                DaemonError::Core(GriteError::NotFound(format!(
803                    "Issue {} not found",
804                    issue_id
805                )))
806            })?;
807            store.get_issue(&target)?.ok_or_else(|| {
808                DaemonError::Core(GriteError::NotFound(format!(
809                    "Target {} not found",
810                    target_id
811                )))
812            })?;
813
814            if store.would_create_cycle(&id, &target, &dep)? {
815                return Err(DaemonError::Core(GriteError::InvalidArgs(format!(
816                    "Adding this dependency would create a cycle in the {} graph",
817                    dep.as_str()
818                ))));
819            }
820
821            let ts = current_time_ms();
822            let kind = EventKind::DependencyAdded {
823                target,
824                dep_type: dep,
825            };
826            let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
827            let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
828            persist_events(
829                store,
830                wal.as_ref(),
831                &actor_id_bytes,
832                std::slice::from_ref(&event),
833            )?;
834
835            let json = serde_json::to_string(&serde_json::json!({
836                "event_id": id_to_hex(&event_id),
837                "issue_id": issue_id,
838                "target": target_id,
839                "dep_type": dep_type,
840                "action": "added",
841            }))?;
842            Ok(Some(json))
843        }
844
845        IpcCommand::IssueDepRemove {
846            issue_id,
847            target_id,
848            dep_type,
849        } => {
850            use libgrite_core::hash::compute_event_id;
851            use libgrite_core::types::event::{DependencyType, Event, EventKind};
852            use libgrite_core::types::ids::id_to_hex;
853
854            let id = store
855                .resolve_issue_id(issue_id)
856                .map_err(DaemonError::Core)?;
857            let target = store
858                .resolve_issue_id(target_id)
859                .map_err(DaemonError::Core)?;
860            let dep = DependencyType::from_str(dep_type).ok_or_else(|| {
861                DaemonError::Core(GriteError::InvalidArgs(format!(
862                    "Invalid dep type: {}",
863                    dep_type
864                )))
865            })?;
866
867            let ts = current_time_ms();
868            let kind = EventKind::DependencyRemoved {
869                target,
870                dep_type: dep,
871            };
872            let event_id = compute_event_id(&id, &actor_id_bytes, ts, None, &kind);
873            let event = Event::new(event_id, id, actor_id_bytes, ts, None, kind);
874            persist_events(
875                store,
876                wal.as_ref(),
877                &actor_id_bytes,
878                std::slice::from_ref(&event),
879            )?;
880
881            let json = serde_json::to_string(&serde_json::json!({
882                "event_id": id_to_hex(&event_id),
883                "issue_id": issue_id,
884                "target": target_id,
885                "dep_type": dep_type,
886                "action": "removed",
887            }))?;
888            Ok(Some(json))
889        }
890
891        IpcCommand::IssueDepList { issue_id, reverse } => {
892            use libgrite_core::types::ids::id_to_hex;
893
894            let id = store
895                .resolve_issue_id(issue_id)
896                .map_err(DaemonError::Core)?;
897            let deps = if *reverse {
898                store.get_dependents(&id)?
899            } else {
900                store.get_dependencies(&id)?
901            };
902            let dep_list: Vec<serde_json::Value> = deps
903                .iter()
904                .map(|(target, dep_type)| {
905                    let title = match store.get_issue(target) {
906                        Ok(Some(p)) => p.title.clone(),
907                        Ok(None) => "?".to_string(),
908                        Err(e) => return Err(DaemonError::Core(e)),
909                    };
910                    Ok(serde_json::json!({
911                        "issue_id": id_to_hex(target),
912                        "dep_type": dep_type.as_str(),
913                        "title": title,
914                    }))
915                })
916                .collect::<Result<Vec<_>, DaemonError>>()?;
917            let json = serde_json::to_string(&serde_json::json!({
918                "issue_id": issue_id,
919                "direction": if *reverse { "dependents" } else { "dependencies" },
920                "deps": dep_list,
921            }))?;
922            Ok(Some(json))
923        }
924
925        IpcCommand::IssueDepTopo { state, label } => {
926            use libgrite_core::types::event::IssueState;
927            use libgrite_core::types::ids::id_to_hex;
928
929            let filter = IssueFilter {
930                state: state.as_deref().map(|s| match s {
931                    "closed" => IssueState::Closed,
932                    _ => IssueState::Open,
933                }),
934                label: label.clone(),
935            };
936            let sorted = store.topological_order(&filter)?;
937            let issues: Vec<serde_json::Value> = sorted
938                .iter()
939                .map(|s| {
940                    serde_json::json!({
941                        "issue_id": id_to_hex(&s.issue_id),
942                        "title": s.title,
943                        "state": format!("{:?}", s.state).to_lowercase(),
944                        "labels": s.labels,
945                    })
946                })
947                .collect();
948            let json = serde_json::to_string(&serde_json::json!({
949                "issues": issues,
950                "order": "topological",
951            }))?;
952            Ok(Some(json))
953        }
954
955        // DaemonStatus and DaemonStop are handled at the supervisor level
956        // in process_request() and never reach the worker.
957        IpcCommand::DaemonStatus | IpcCommand::DaemonStop => Err(DaemonError::Core(
958            GriteError::Internal("supervisor-only command received by worker".to_string()),
959        )),
960
961        IpcCommand::Sync { remote, pull, push } => {
962            let sync_mgr = SyncManager::open(git_dir)?;
963
964            // If neither flag is set, do both pull and push
965            let do_pull = *pull || !*push;
966            let do_push = *push || !*pull;
967
968            // Auto-backfill WAL from sled if WAL is empty
969            if do_push {
970                if let Some(w) = wal.as_ref() {
971                    if w.head().unwrap_or(None).is_none() {
972                        let events = store.get_all_events().unwrap_or_default();
973                        if !events.is_empty() {
974                            let mut sorted = events;
975                            sorted.sort_by_key(|e| e.ts_unix_ms);
976                            match w.append(&actor_id_bytes, &sorted) {
977                                Ok(_) => info!("Auto-backfilled WAL with {} events", sorted.len()),
978                                Err(e) => warn!("WAL backfill failed: {}", e),
979                            }
980                        }
981                    }
982                }
983            }
984
985            let result = if do_pull && !do_push {
986                // Pull only
987                let pull_result = sync_mgr.pull(remote)?;
988                let wal_head: Option<String> = pull_result.new_wal_head.map(|oid| oid.to_string());
989                serde_json::json!({
990                    "pulled": true,
991                    "pushed": false,
992                    "pull_events": pull_result.events_pulled,
993                    "pull_wal_head": wal_head,
994                    "message": pull_result.message,
995                })
996            } else if do_push && !do_pull {
997                // Push only with auto-rebase
998                let push_result = sync_mgr.push_with_rebase(remote, &actor_id_bytes)?;
999                serde_json::json!({
1000                    "pulled": false,
1001                    "pushed": true,
1002                    "push_success": push_result.success,
1003                    "push_rebased": push_result.rebased,
1004                    "push_events_rebased": push_result.events_rebased,
1005                    "message": push_result.message,
1006                })
1007            } else {
1008                // Full sync: pull then push with auto-rebase
1009                let (pull_result, push_result) =
1010                    sync_mgr.sync_with_rebase(remote, &actor_id_bytes)?;
1011                let wal_head: Option<String> = pull_result.new_wal_head.map(|oid| oid.to_string());
1012                serde_json::json!({
1013                    "pulled": true,
1014                    "pushed": true,
1015                    "pull_events": pull_result.events_pulled,
1016                    "pull_wal_head": wal_head,
1017                    "push_success": push_result.success,
1018                    "push_rebased": push_result.rebased,
1019                    "push_events_rebased": push_result.events_rebased,
1020                    "message": format!("{} / {}", pull_result.message, push_result.message),
1021                })
1022            };
1023
1024            Ok(Some(result.to_string()))
1025        }
1026
1027        IpcCommand::SnapshotCreate | IpcCommand::SnapshotList | IpcCommand::SnapshotGc { .. } => {
1028            Err(DaemonError::Core(GriteError::Internal(
1029                "Snapshot through daemon not yet implemented - use --no-daemon".to_string(),
1030            )))
1031        }
1032    }
1033}
1034
1035/// Convert an IssueProjection to a JSON value with hex-encoded IDs
1036fn projection_to_json(p: &libgrite_core::types::issue::IssueProjection) -> serde_json::Value {
1037    use libgrite_core::types::ids::id_to_hex;
1038
1039    let comments: Vec<serde_json::Value> = p
1040        .comments
1041        .iter()
1042        .map(|c| {
1043            serde_json::json!({
1044                "event_id": id_to_hex(&c.event_id),
1045                "actor": id_to_hex(&c.actor),
1046                "ts_unix_ms": c.ts_unix_ms,
1047                "body": c.body,
1048            })
1049        })
1050        .collect();
1051    let links: Vec<serde_json::Value> = p
1052        .links
1053        .iter()
1054        .map(|l| {
1055            serde_json::json!({
1056                "event_id": id_to_hex(&l.event_id),
1057                "url": l.url,
1058                "note": l.note,
1059            })
1060        })
1061        .collect();
1062    let attachments: Vec<serde_json::Value> = p
1063        .attachments
1064        .iter()
1065        .map(|a| {
1066            serde_json::json!({
1067                "event_id": id_to_hex(&a.event_id),
1068                "name": a.name,
1069                "sha256": hex::encode(a.sha256),
1070                "mime": a.mime,
1071            })
1072        })
1073        .collect();
1074    let deps: Vec<serde_json::Value> = p
1075        .dependencies
1076        .iter()
1077        .map(|d| {
1078            serde_json::json!({
1079                "target": id_to_hex(&d.target),
1080                "dep_type": d.dep_type.as_str(),
1081            })
1082        })
1083        .collect();
1084
1085    serde_json::json!({
1086        "issue_id": id_to_hex(&p.issue_id),
1087        "title": p.title,
1088        "body": p.body,
1089        "state": format!("{:?}", p.state).to_lowercase(),
1090        "labels": p.labels,
1091        "assignees": p.assignees,
1092        "comments": comments,
1093        "links": links,
1094        "attachments": attachments,
1095        "dependencies": deps,
1096        "created_ts": p.created_ts,
1097        "updated_ts": p.updated_ts,
1098    })
1099}
1100
1101/// Get current time in milliseconds since Unix epoch
1102fn current_time_ms() -> u64 {
1103    std::time::SystemTime::now()
1104        .duration_since(std::time::UNIX_EPOCH)
1105        .unwrap_or_default()
1106        .as_millis() as u64
1107}
1108
1109/// Convert error to (code, message) for IPC response
1110fn error_to_code_message(e: &DaemonError) -> (String, String) {
1111    use libgrite_ipc::error::codes;
1112
1113    match e {
1114        DaemonError::Core(GriteError::NotFound(_)) => (codes::NOT_FOUND.to_string(), e.to_string()),
1115        DaemonError::Core(GriteError::InvalidArgs(_)) => {
1116            (codes::INVALID_INPUT.to_string(), e.to_string())
1117        }
1118        DaemonError::Core(GriteError::Io(_)) => (codes::IO_ERROR.to_string(), e.to_string()),
1119        DaemonError::Git(_) => (codes::GIT_ERROR.to_string(), e.to_string()),
1120        DaemonError::Ipc(_) => (codes::IPC_ERROR.to_string(), e.to_string()),
1121        _ => (codes::INTERNAL.to_string(), e.to_string()),
1122    }
1123}