Skip to main content

daemon/grpc_local_impl/
timeline.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Local gRPC service for agent timeline operation objects.
3
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use grpc::heddle::v1::{
7    AgentTimelineBranchCreated, AgentTimelineCursorMoved, AgentTimelineNativeToolCall,
8    AgentTimelineOperationDraft, AgentTimelineOperationRecord, AgentTimelineStateSummary,
9    AgentTimelineStatus, AgentTimelineStepSummary, AgentTimelineToolCallFinished,
10    AgentTimelineToolCallStarted, AgentTimelineToolPayload, CreateTimelineBranchRequest,
11    CreateTimelineBranchResponse, ForkTimelineFromSelectorRequest,
12    ForkTimelineFromSelectorResponse, GetTimelineNavigationRequest, GetTimelineOperationRequest,
13    GetTimelineStatusRequest, ListTimelineStepsRequest, ListTimelineStepsResponse,
14    MaterializeTimelineCursorRequest, PreviewTimelineSeekRequest, RecordTimelineOperationRequest,
15    RecoverTimelineMaterializationRequest, RecoverTimelineMaterializationResponse,
16    ResetTimelineCursorRequest, ResetTimelineCursorResponse, ResolveNativeToolCallRequest,
17    SeekTimelineToNativeToolCallRequest, SeekTimelineToStepRequest, TimelineCursorMoveResponse,
18    TimelineCursorRequest, TimelineCursorSelector,
19    TimelineMaterializationBlocker as WireTimelineMaterializationBlocker,
20    TimelineMaterializationBlockerKind,
21    TimelineMaterializationRecoveryBlocker as WireTimelineMaterializationRecoveryBlocker,
22    TimelineMaterializationRecoveryBlockerKind,
23    TimelineMaterializationRecoveryStatus as WireTimelineMaterializationRecoveryStatus,
24    TimelineMaterializeMode, TimelineMaterializeResponse,
25    TimelineMaterializeStatus as WireTimelineMaterializeStatus,
26    TimelineNavigationActionAvailability as WireTimelineNavigationActionAvailability,
27    TimelineNavigationBranch as WireTimelineNavigationBranch,
28    TimelineNavigationCursor as WireTimelineNavigationCursor,
29    TimelineNavigationRecovery as WireTimelineNavigationRecovery,
30    TimelineNavigationRecoveryStatus as WireTimelineNavigationRecoveryStatus,
31    TimelineNavigationSnapshot as WireTimelineNavigationSnapshot,
32    TimelineNavigationStep as WireTimelineNavigationStep, TimelineSeekNativeToolCallSelector,
33    TimelineSeekPreview, TimelineSeekSelector, TimelineSeekStepSelector,
34    agent_timeline_operation_draft, agent_timeline_operation_record, timeline_seek_selector,
35    timeline_service_server::TimelineService,
36};
37use objects::object::{
38    BranchCreatedV1, ChangeId, ContentHash, CursorMovedV1, NativeToolCallRefV1, TimelineBranchId,
39    TimelineBranchReason, TimelineCursorMoveReason, TimelineLabel, TimelineOperationBodyV1,
40    TimelineOperationEnvelope, TimelineOperationId, TimelineStepId, TimelineToolCallStatus,
41    TimelineToolPayloadMetadata, ToolCallFinishedV1, ToolCallStartedV1,
42};
43use prost::Message;
44use repo::{
45    TimelineCursorMoveRecord, TimelineMaterializationBlocker,
46    TimelineMaterializationRecoveryBlocker, TimelineMaterializationRecoveryOutcome,
47    TimelineMaterializationRecoveryStatus, TimelineMaterializeMode as RepoTimelineMaterializeMode,
48    TimelineMaterializeOutcome, TimelineMaterializeStatus, TimelineNativeToolKey,
49    TimelineNavigationRecoveryStatus as RepoTimelineNavigationRecoveryStatus,
50    TimelineNavigationSnapshot as RepoTimelineNavigationSnapshot,
51    TimelineNavigationStep as RepoTimelineNavigationStep, TimelineSeekBranchConstraint,
52    TimelineSeekPreview as RepoTimelineSeekPreview,
53    TimelineSeekSelector as RepoTimelineSeekSelector, TimelineSeekTarget, TimelineStepSummary,
54    TimelineStore, TimelineThreadStatus, TimelineView,
55};
56use tonic::{Request, Response, Status};
57
58use super::{GrpcLocalService, to_status, with_idempotency};
59
60#[derive(Clone)]
61pub struct LocalTimelineService {
62    inner: GrpcLocalService,
63}
64
65impl LocalTimelineService {
66    pub fn new(inner: GrpcLocalService) -> Self {
67        Self { inner }
68    }
69}
70
71#[tonic::async_trait]
72impl TimelineService for LocalTimelineService {
73    async fn record_operation(
74        &self,
75        request: Request<RecordTimelineOperationRequest>,
76    ) -> Result<Response<AgentTimelineOperationRecord>, Status> {
77        let req = request.into_inner();
78        let body = req.encode_to_vec();
79        let client_op = req.client_operation_id.clone();
80        let inner = self.inner.clone();
81
82        let response = with_idempotency(
83            &self.inner,
84            &client_op,
85            "TimelineService.RecordOperation",
86            &body,
87            move || async move {
88                let draft = req
89                    .operation
90                    .ok_or_else(|| Status::invalid_argument("operation is required"))?;
91                let envelope = draft_to_envelope(draft)?;
92                let bytes = envelope
93                    .encode()
94                    .map_err(|err| Status::invalid_argument(err.to_string()))?;
95                let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
96                let id = store.write_operation_bytes(&bytes).map_err(to_status)?;
97                record_from_envelope(id, envelope, bytes)
98            },
99        )
100        .await?;
101
102        Ok(Response::new(response))
103    }
104
105    async fn get_operation(
106        &self,
107        request: Request<GetTimelineOperationRequest>,
108    ) -> Result<Response<AgentTimelineOperationRecord>, Status> {
109        let req = request.into_inner();
110        let id = TimelineOperationId::try_from_slice(&req.operation_id)
111            .map_err(|err| Status::invalid_argument(format!("invalid operation_id: {err}")))?;
112        let store = TimelineStore::open(self.inner.repo().heddle_dir()).map_err(to_status)?;
113        let bytes = store
114            .read_operation_bytes(&id)
115            .map_err(to_status)?
116            .ok_or_else(|| Status::not_found(format!("timeline operation {}", id.short())))?;
117        let envelope = TimelineOperationEnvelope::decode(&bytes)
118            .map_err(|err| Status::internal(format!("decode stored timeline operation: {err}")))?;
119        Ok(Response::new(record_from_envelope(id, envelope, bytes)?))
120    }
121
122    async fn get_timeline_status(
123        &self,
124        request: Request<GetTimelineStatusRequest>,
125    ) -> Result<Response<AgentTimelineStatus>, Status> {
126        let req = request.into_inner();
127        let (_store, view) = open_timeline_store_and_view(&self.inner)?;
128        Ok(Response::new(status_for_thread(&view, &req.thread)))
129    }
130
131    async fn get_timeline_navigation(
132        &self,
133        request: Request<GetTimelineNavigationRequest>,
134    ) -> Result<Response<WireTimelineNavigationSnapshot>, Status> {
135        let req = request.into_inner();
136        let snapshot = get_timeline_navigation_impl(&self.inner, req)?;
137        Ok(Response::new(snapshot))
138    }
139
140    async fn list_timeline_steps(
141        &self,
142        request: Request<ListTimelineStepsRequest>,
143    ) -> Result<Response<ListTimelineStepsResponse>, Status> {
144        let req = request.into_inner();
145        let (_store, view) = open_timeline_store_and_view(&self.inner)?;
146        let branch_id = if req.branch_id.is_empty() {
147            view.status(&req.thread)
148                .and_then(|status| status.current_branch_id.clone())
149        } else {
150            Some(TimelineBranchId::new(req.branch_id.clone()))
151        };
152        let mut steps = branch_id
153            .as_ref()
154            .map(|branch_id| {
155                view.list_branch_steps(&req.thread, branch_id)
156                    .into_iter()
157                    .map(step_summary_to_proto)
158                    .collect::<Vec<_>>()
159            })
160            .unwrap_or_default();
161        if req.limit > 0 && steps.len() > req.limit as usize {
162            let keep_from = steps.len() - req.limit as usize;
163            steps = steps.split_off(keep_from);
164        }
165        Ok(Response::new(ListTimelineStepsResponse {
166            steps,
167            status: Some(status_for_thread(&view, &req.thread)),
168        }))
169    }
170
171    async fn resolve_native_tool_call(
172        &self,
173        request: Request<ResolveNativeToolCallRequest>,
174    ) -> Result<Response<AgentTimelineStepSummary>, Status> {
175        let req = request.into_inner();
176        let (_store, view) = open_timeline_store_and_view(&self.inner)?;
177        let native = native_key_from_resolve_request(&req);
178        let step = view
179            .find_step_by_native_call(&req.thread, &native)
180            .ok_or_else(|| Status::not_found("native tool call not found"))?;
181        Ok(Response::new(step_summary_to_proto(step)))
182    }
183
184    async fn seek_to_step(
185        &self,
186        request: Request<SeekTimelineToStepRequest>,
187    ) -> Result<Response<TimelineCursorMoveResponse>, Status> {
188        let req = request.into_inner();
189        let body = req.encode_to_vec();
190        let client_op = req.client_operation_id.clone();
191        let inner = self.inner.clone();
192
193        let response = with_idempotency(
194            &self.inner,
195            &client_op,
196            "TimelineService.SeekToStep",
197            &body,
198            move || async move { seek_to_step_impl(&inner, req).await },
199        )
200        .await?;
201
202        Ok(Response::new(response))
203    }
204
205    async fn seek_to_native_tool_call(
206        &self,
207        request: Request<SeekTimelineToNativeToolCallRequest>,
208    ) -> Result<Response<TimelineCursorMoveResponse>, Status> {
209        let req = request.into_inner();
210        let body = req.encode_to_vec();
211        let client_op = req.client_operation_id.clone();
212        let inner = self.inner.clone();
213
214        let response = with_idempotency(
215            &self.inner,
216            &client_op,
217            "TimelineService.SeekToNativeToolCall",
218            &body,
219            move || async move { seek_to_native_tool_call_impl(&inner, req).await },
220        )
221        .await?;
222
223        Ok(Response::new(response))
224    }
225
226    async fn undo_tool_call(
227        &self,
228        request: Request<TimelineCursorRequest>,
229    ) -> Result<Response<TimelineCursorMoveResponse>, Status> {
230        let req = request.into_inner();
231        let body = req.encode_to_vec();
232        let client_op = req.client_operation_id.clone();
233        let inner = self.inner.clone();
234
235        let response = with_idempotency(
236            &self.inner,
237            &client_op,
238            "TimelineService.UndoToolCall",
239            &body,
240            move || async move { move_cursor_by_delta_impl(&inner, req, -1).await },
241        )
242        .await?;
243
244        Ok(Response::new(response))
245    }
246
247    async fn redo_tool_call(
248        &self,
249        request: Request<TimelineCursorRequest>,
250    ) -> Result<Response<TimelineCursorMoveResponse>, Status> {
251        let req = request.into_inner();
252        let body = req.encode_to_vec();
253        let client_op = req.client_operation_id.clone();
254        let inner = self.inner.clone();
255
256        let response = with_idempotency(
257            &self.inner,
258            &client_op,
259            "TimelineService.RedoToolCall",
260            &body,
261            move || async move { move_cursor_by_delta_impl(&inner, req, 1).await },
262        )
263        .await?;
264
265        Ok(Response::new(response))
266    }
267
268    async fn create_timeline_branch(
269        &self,
270        request: Request<CreateTimelineBranchRequest>,
271    ) -> Result<Response<CreateTimelineBranchResponse>, Status> {
272        let req = request.into_inner();
273        let body = req.encode_to_vec();
274        let client_op = req.client_operation_id.clone();
275        let inner = self.inner.clone();
276
277        let response = with_idempotency(
278            &self.inner,
279            &client_op,
280            "TimelineService.CreateTimelineBranch",
281            &body,
282            move || async move { create_timeline_branch_impl(&inner, req).await },
283        )
284        .await?;
285
286        Ok(Response::new(response))
287    }
288
289    async fn preview_timeline_seek(
290        &self,
291        request: Request<PreviewTimelineSeekRequest>,
292    ) -> Result<Response<TimelineSeekPreview>, Status> {
293        let req = request.into_inner();
294        let preview = preview_timeline_seek_impl(&self.inner, req)?;
295        Ok(Response::new(preview))
296    }
297
298    async fn materialize_timeline_cursor(
299        &self,
300        request: Request<MaterializeTimelineCursorRequest>,
301    ) -> Result<Response<TimelineMaterializeResponse>, Status> {
302        let req = request.into_inner();
303        let body = req.encode_to_vec();
304        let client_op = req.client_operation_id.clone();
305        let inner = self.inner.clone();
306
307        let response = with_idempotency(
308            &self.inner,
309            &client_op,
310            "TimelineService.MaterializeTimelineCursor",
311            &body,
312            move || async move { materialize_timeline_cursor_impl(&inner, req).await },
313        )
314        .await?;
315
316        Ok(Response::new(response))
317    }
318
319    async fn fork_timeline_from_selector(
320        &self,
321        request: Request<ForkTimelineFromSelectorRequest>,
322    ) -> Result<Response<ForkTimelineFromSelectorResponse>, Status> {
323        let req = request.into_inner();
324        let body = req.encode_to_vec();
325        let client_op = req.client_operation_id.clone();
326        let inner = self.inner.clone();
327
328        let response = with_idempotency(
329            &self.inner,
330            &client_op,
331            "TimelineService.ForkTimelineFromSelector",
332            &body,
333            move || async move { fork_timeline_from_selector_impl(&inner, req).await },
334        )
335        .await?;
336
337        Ok(Response::new(response))
338    }
339
340    async fn reset_timeline_cursor(
341        &self,
342        request: Request<ResetTimelineCursorRequest>,
343    ) -> Result<Response<ResetTimelineCursorResponse>, Status> {
344        let req = request.into_inner();
345        let body = req.encode_to_vec();
346        let client_op = req.client_operation_id.clone();
347        let inner = self.inner.clone();
348
349        let response = with_idempotency(
350            &self.inner,
351            &client_op,
352            "TimelineService.ResetTimelineCursor",
353            &body,
354            move || async move { reset_timeline_cursor_impl(&inner, req).await },
355        )
356        .await?;
357
358        Ok(Response::new(response))
359    }
360
361    async fn recover_timeline_materialization(
362        &self,
363        request: Request<RecoverTimelineMaterializationRequest>,
364    ) -> Result<Response<RecoverTimelineMaterializationResponse>, Status> {
365        let req = request.into_inner();
366        let body = req.encode_to_vec();
367        let client_op = req.client_operation_id.clone();
368        let inner = self.inner.clone();
369
370        let response = with_idempotency(
371            &self.inner,
372            &client_op,
373            "TimelineService.RecoverTimelineMaterialization",
374            &body,
375            move || async move { recover_timeline_materialization_impl(&inner, req).await },
376        )
377        .await?;
378
379        Ok(Response::new(response))
380    }
381}
382
383async fn seek_to_step_impl(
384    inner: &GrpcLocalService,
385    req: SeekTimelineToStepRequest,
386) -> Result<TimelineCursorMoveResponse, Status> {
387    let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
388    let _record_guard = store.lock_recording(&req.thread).map_err(to_status)?;
389    let view = TimelineView::rebuild(&store).map_err(to_status)?;
390    let target = view
391        .resolve_seek_target(&req.thread, &TimelineStepId::new(req.step_id.clone()))
392        .ok_or_else(|| Status::not_found("timeline step not found"))?;
393    if !req.branch_id.is_empty() && target.branch_id.as_str() != req.branch_id {
394        return Err(Status::failed_precondition(
395            "timeline step belongs to a different branch",
396        ));
397    }
398    write_cursor_move(&store, &target, &view, parse_seek_reason(&req.reason)?)
399}
400
401async fn seek_to_native_tool_call_impl(
402    inner: &GrpcLocalService,
403    req: SeekTimelineToNativeToolCallRequest,
404) -> Result<TimelineCursorMoveResponse, Status> {
405    let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
406    let _record_guard = store.lock_recording(&req.thread).map_err(to_status)?;
407    let view = TimelineView::rebuild(&store).map_err(to_status)?;
408    let target = view
409        .resolve_seek_to_native_call(&req.thread, &native_key_from_parts(&req))
410        .ok_or_else(|| Status::not_found("native tool call not found"))?
411        .clone();
412    write_cursor_move(&store, &target, &view, parse_seek_reason(&req.reason)?)
413}
414
415async fn move_cursor_by_delta_impl(
416    inner: &GrpcLocalService,
417    req: TimelineCursorRequest,
418    delta: i32,
419) -> Result<TimelineCursorMoveResponse, Status> {
420    let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
421    let _record_guard = store.lock_recording(&req.thread).map_err(to_status)?;
422    let view = TimelineView::rebuild(&store).map_err(to_status)?;
423    if !req.branch_id.is_empty()
424        && view
425            .status(&req.thread)
426            .and_then(|status| status.current_branch_id.as_ref())
427            .is_some_and(|branch_id| branch_id.as_str() != req.branch_id)
428    {
429        return Err(Status::failed_precondition(
430            "timeline cursor is on a different branch",
431        ));
432    }
433    let target = if delta < 0 {
434        view.resolve_undo_target(&req.thread)
435    } else {
436        view.resolve_redo_target(&req.thread)
437    }
438    .ok_or_else(|| Status::failed_precondition("timeline cursor is not initialized"))?;
439    write_cursor_move(
440        &store,
441        &target,
442        &view,
443        if delta < 0 {
444            TimelineCursorMoveReason::Undo
445        } else {
446            TimelineCursorMoveReason::Redo
447        },
448    )
449}
450
451async fn create_timeline_branch_impl(
452    inner: &GrpcLocalService,
453    req: CreateTimelineBranchRequest,
454) -> Result<CreateTimelineBranchResponse, Status> {
455    let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
456    let _record_guard = store.lock_recording(&req.thread).map_err(to_status)?;
457    let view = TimelineView::rebuild(&store).map_err(to_status)?;
458    let target = if req.from_step_id.is_empty() {
459        let status = view.status(&req.thread).ok_or_else(|| {
460            Status::failed_precondition("from_step_id is required when the cursor has no step")
461        })?;
462        let branch_id = status.current_branch_id.clone().ok_or_else(|| {
463            Status::failed_precondition("from_step_id is required when the cursor has no branch")
464        })?;
465        let state = status.current_state.ok_or_else(|| {
466            Status::failed_precondition("from_step_id is required when the cursor has no state")
467        })?;
468        TimelineSeekTarget {
469            thread: req.thread.clone(),
470            branch_id,
471            step_id: status.current_step_id.clone(),
472            state,
473        }
474    } else {
475        view.resolve_seek_target(&req.thread, &TimelineStepId::new(req.from_step_id.clone()))
476            .ok_or_else(|| Status::not_found("timeline step not found"))?
477    };
478    let branch_id = if req.branch_id.is_empty() {
479        TimelineBranchId::generate()
480    } else {
481        TimelineBranchId::new(req.branch_id)
482    };
483    let body = BranchCreatedV1 {
484        thread: req.thread.clone(),
485        branch_id: branch_id.clone(),
486        parent_branch_id: Some(target.branch_id.clone()),
487        from_step_id: target.step_id.clone(),
488        from_state: target.state,
489        reason: parse_branch_reason_or_default(&req.reason)?,
490        created_at_ms: now_ms(),
491    };
492    let record = write_timeline_envelope(
493        &store,
494        TimelineOperationEnvelope::new(TimelineOperationBodyV1::BranchCreated(body), Vec::new()),
495    )?;
496    let view = TimelineView::rebuild(&store).map_err(to_status)?;
497    Ok(CreateTimelineBranchResponse {
498        status: Some(status_for_thread(&view, &req.thread)),
499        branch_id: branch_id.to_string(),
500        parent_branch_id: target.branch_id.to_string(),
501        from_step_id: target.step_id.map(|id| id.to_string()).unwrap_or_default(),
502        operation: Some(record),
503    })
504}
505
506fn get_timeline_navigation_impl(
507    inner: &GrpcLocalService,
508    req: GetTimelineNavigationRequest,
509) -> Result<WireTimelineNavigationSnapshot, Status> {
510    if req.thread.trim().is_empty() {
511        return Err(Status::invalid_argument("thread is required"));
512    }
513    let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
514    let snapshot = inner
515        .repo()
516        .timeline_navigation_snapshot(&store, &req.thread)
517        .map_err(to_status)?;
518    let view = TimelineView::rebuild(&store).map_err(to_status)?;
519    Ok(timeline_navigation_snapshot_to_proto(&view, snapshot))
520}
521
522fn preview_timeline_seek_impl(
523    inner: &GrpcLocalService,
524    req: PreviewTimelineSeekRequest,
525) -> Result<TimelineSeekPreview, Status> {
526    let selection = repo_seek_selection(req.selector)?;
527    let mode = repo_materialize_mode(req.mode);
528    let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
529    let preview = inner
530        .repo()
531        .preview_timeline_seek_constrained(
532            &store,
533            &selection.thread,
534            &selection.selector,
535            mode,
536            selection.branch_constraint.as_ref(),
537        )
538        .map_err(to_status)?;
539    let view = TimelineView::rebuild(&store).map_err(to_status)?;
540    Ok(repo_seek_preview_to_proto(&view, &preview))
541}
542
543async fn materialize_timeline_cursor_impl(
544    inner: &GrpcLocalService,
545    req: MaterializeTimelineCursorRequest,
546) -> Result<TimelineMaterializeResponse, Status> {
547    let selection = repo_seek_selection(req.selector)?;
548    let mode = repo_materialize_mode(req.mode);
549    let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
550    let before_view = TimelineView::rebuild(&store).map_err(to_status)?;
551
552    let outcome = inner
553        .repo()
554        .materialize_timeline_cursor_constrained(
555            &store,
556            &selection.thread,
557            &selection.selector,
558            mode,
559            selection.branch_constraint.as_ref(),
560            now_ms(),
561        )
562        .map_err(to_status)?;
563    materialize_response_from_outcome(&store, &before_view, &selection.thread, outcome)
564}
565
566async fn fork_timeline_from_selector_impl(
567    inner: &GrpcLocalService,
568    req: ForkTimelineFromSelectorRequest,
569) -> Result<ForkTimelineFromSelectorResponse, Status> {
570    let selection = repo_seek_selection(req.selector)?;
571    let branch_id = non_empty(req.branch_id).map(TimelineBranchId::new);
572    let reason = parse_branch_reason_or_default(&req.reason)?;
573    let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
574    let outcome = inner
575        .repo()
576        .fork_timeline_from_selector(
577            &store,
578            &selection.thread,
579            &selection.selector,
580            selection.branch_constraint.as_ref(),
581            branch_id,
582            reason,
583            now_ms(),
584        )
585        .map_err(to_status)?;
586    let operation = record_from_store(&store, outcome.operation_id)?;
587    let view = TimelineView::rebuild(&store).map_err(to_status)?;
588
589    Ok(ForkTimelineFromSelectorResponse {
590        navigation: Some(timeline_navigation_snapshot_to_proto(
591            &view,
592            outcome.navigation,
593        )),
594        operation: Some(operation),
595        branch_id: outcome.branch_id.to_string(),
596        parent_branch_id: outcome.parent_branch_id.to_string(),
597        from_step_id: outcome
598            .from_step_id
599            .map(|step_id| step_id.to_string())
600            .unwrap_or_default(),
601    })
602}
603
604async fn reset_timeline_cursor_impl(
605    inner: &GrpcLocalService,
606    req: ResetTimelineCursorRequest,
607) -> Result<ResetTimelineCursorResponse, Status> {
608    let selection = repo_seek_selection(req.selector)?;
609    let mode = repo_materialize_mode(req.mode);
610    let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
611    let before_view = TimelineView::rebuild(&store).map_err(to_status)?;
612    let outcome = inner
613        .repo()
614        .reset_timeline_cursor(
615            &store,
616            &selection.thread,
617            &selection.selector,
618            mode,
619            selection.branch_constraint.as_ref(),
620            req.materialize_checkout,
621            now_ms(),
622        )
623        .map_err(to_status)?;
624    let cursor_operation = outcome
625        .cursor_operation_id
626        .map(|id| record_from_store(&store, id))
627        .transpose()?;
628    let materialization = outcome
629        .materialization
630        .map(|materialization| {
631            materialize_response_from_outcome(
632                &store,
633                &before_view,
634                &selection.thread,
635                materialization,
636            )
637        })
638        .transpose()?;
639    let view = TimelineView::rebuild(&store).map_err(to_status)?;
640
641    Ok(ResetTimelineCursorResponse {
642        navigation: Some(timeline_navigation_snapshot_to_proto(
643            &view,
644            outcome.navigation,
645        )),
646        cursor_operation,
647        materialization,
648    })
649}
650
651async fn recover_timeline_materialization_impl(
652    inner: &GrpcLocalService,
653    req: RecoverTimelineMaterializationRequest,
654) -> Result<RecoverTimelineMaterializationResponse, Status> {
655    if req.thread.trim().is_empty() {
656        return Err(Status::invalid_argument("thread is required"));
657    }
658    let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
659    let outcome = inner
660        .repo()
661        .recover_timeline_materialization_action(&store, &req.thread)
662        .map_err(to_status)?;
663    let recovered_cursor_operation = outcome
664        .recovery
665        .cursor_operation_id
666        .map(|id| record_from_store(&store, id))
667        .transpose()?;
668    let recovery_status = recovery_status_to_wire_code(&outcome.recovery);
669    let recovery_blockers = recovery_blockers_to_wire_details(&outcome.recovery);
670    let view = TimelineView::rebuild(&store).map_err(to_status)?;
671
672    Ok(RecoverTimelineMaterializationResponse {
673        navigation: Some(timeline_navigation_snapshot_to_proto(
674            &view,
675            outcome.navigation,
676        )),
677        recovered_cursor_operation,
678        recovery_status: recovery_status as i32,
679        recovery_blockers,
680    })
681}
682
683fn materialize_response_from_outcome(
684    store: &TimelineStore,
685    before_view: &TimelineView,
686    thread: &str,
687    outcome: TimelineMaterializeOutcome,
688) -> Result<TimelineMaterializeResponse, Status> {
689    let status = outcome.status.clone();
690    let cursor_operation = outcome
691        .cursor_operation_id
692        .map(|id| record_from_store(store, id))
693        .transpose()?;
694    let recovered_cursor_operation = outcome
695        .recovery
696        .cursor_operation_id
697        .map(|id| record_from_store(store, id))
698        .transpose()?;
699    let after_view = TimelineView::rebuild(store).map_err(to_status)?;
700    let blockers = blockers_to_wire_details(&outcome.preview.blockers);
701    let recovery_blockers = recovery_blockers_to_wire_details(&outcome.recovery);
702
703    Ok(TimelineMaterializeResponse {
704        updated_status: Some(status_for_thread(&after_view, thread)),
705        preview: Some(repo_seek_preview_to_proto(before_view, &outcome.preview)),
706        cursor_operation,
707        materialized: matches!(
708            status,
709            TimelineMaterializeStatus::Materialized | TimelineMaterializeStatus::AlreadyAtTarget
710        ),
711        blockers,
712        status: materialize_status_to_wire_code(&status) as i32,
713        recovered_cursor_operation,
714        recovery_status: recovery_status_to_wire_code(&outcome.recovery) as i32,
715        recovery_blockers,
716    })
717}
718
719struct RepoSeekSelection {
720    thread: String,
721    selector: RepoTimelineSeekSelector,
722    branch_constraint: Option<TimelineSeekBranchConstraint>,
723}
724
725fn repo_seek_selection(
726    selector: Option<TimelineSeekSelector>,
727) -> Result<RepoSeekSelection, Status> {
728    let selector = selector.ok_or_else(|| Status::invalid_argument("selector is required"))?;
729    let target = selector
730        .target
731        .ok_or_else(|| Status::invalid_argument("selector target is required"))?;
732
733    match target {
734        timeline_seek_selector::Target::Step(step) => repo_step_selection(step),
735        timeline_seek_selector::Target::NativeToolCall(native) => repo_native_selection(native),
736        timeline_seek_selector::Target::Undo(cursor) => {
737            repo_cursor_selection(cursor, RepoTimelineSeekSelector::Undo)
738        }
739        timeline_seek_selector::Target::Redo(cursor) => {
740            repo_cursor_selection(cursor, RepoTimelineSeekSelector::Redo)
741        }
742        timeline_seek_selector::Target::CurrentCursor(cursor) => {
743            repo_cursor_selection(cursor, RepoTimelineSeekSelector::CurrentCursor)
744        }
745    }
746}
747
748fn repo_step_selection(selector: TimelineSeekStepSelector) -> Result<RepoSeekSelection, Status> {
749    if selector.thread.trim().is_empty() {
750        return Err(Status::invalid_argument("selector.step.thread is required"));
751    }
752    if selector.step_id.trim().is_empty() {
753        return Err(Status::invalid_argument(
754            "selector.step.step_id is required",
755        ));
756    }
757    let branch_constraint = non_empty(selector.branch_id)
758        .map(TimelineBranchId::new)
759        .map(TimelineSeekBranchConstraint::Target);
760    Ok(RepoSeekSelection {
761        thread: selector.thread,
762        selector: RepoTimelineSeekSelector::StepId(TimelineStepId::new(selector.step_id)),
763        branch_constraint,
764    })
765}
766
767fn repo_native_selection(
768    selector: TimelineSeekNativeToolCallSelector,
769) -> Result<RepoSeekSelection, Status> {
770    if selector.thread.trim().is_empty() {
771        return Err(Status::invalid_argument(
772            "selector.native_tool_call.thread is required",
773        ));
774    }
775    if selector.harness.trim().is_empty() {
776        return Err(Status::invalid_argument(
777            "selector.native_tool_call.harness is required",
778        ));
779    }
780    if selector.tool_call_id.trim().is_empty() {
781        return Err(Status::invalid_argument(
782            "selector.native_tool_call.tool_call_id is required",
783        ));
784    }
785    Ok(RepoSeekSelection {
786        thread: selector.thread,
787        selector: RepoTimelineSeekSelector::NativeToolCall(TimelineNativeToolKey {
788            harness: selector.harness,
789            session_id: non_empty(selector.session_id),
790            message_id: non_empty(selector.message_id),
791            tool_call_id: selector.tool_call_id,
792        }),
793        branch_constraint: None,
794    })
795}
796
797fn repo_cursor_selection(
798    selector: TimelineCursorSelector,
799    repo_selector: RepoTimelineSeekSelector,
800) -> Result<RepoSeekSelection, Status> {
801    if selector.thread.trim().is_empty() {
802        return Err(Status::invalid_argument(
803            "selector cursor thread is required",
804        ));
805    }
806    let branch_constraint = non_empty(selector.branch_id)
807        .map(TimelineBranchId::new)
808        .map(TimelineSeekBranchConstraint::Current);
809    Ok(RepoSeekSelection {
810        thread: selector.thread,
811        selector: repo_selector,
812        branch_constraint,
813    })
814}
815
816fn repo_seek_preview_to_proto(
817    view: &TimelineView,
818    preview: &RepoTimelineSeekPreview,
819) -> TimelineSeekPreview {
820    let current_step = preview
821        .current_step_id
822        .as_ref()
823        .and_then(|step_id| view.step(&preview.thread, step_id))
824        .map(step_summary_to_proto);
825    let current_step_for_state = preview
826        .current_step_id
827        .as_ref()
828        .and_then(|step_id| view.step(&preview.thread, step_id))
829        .map(step_summary_to_proto);
830    let target_step = preview
831        .target
832        .step_id
833        .as_ref()
834        .and_then(|step_id| view.step(&preview.thread, step_id));
835    let worktree_dirty = preview
836        .worktree_status
837        .as_ref()
838        .is_some_and(|status| !status.is_clean());
839    let blockers = blockers_to_wire_details(&preview.blockers);
840    let materialization_supported = !preview
841        .blockers
842        .iter()
843        .any(|blocker| matches!(blocker, TimelineMaterializationBlocker::UnsupportedMode(_)));
844
845    TimelineSeekPreview {
846        current_status: Some(AgentTimelineStatus {
847            thread: preview.thread.clone(),
848            current_branch_id: preview
849                .current_branch_id
850                .as_ref()
851                .map(|id| id.to_string())
852                .unwrap_or_default(),
853            current_step_id: preview
854                .current_step_id
855                .as_ref()
856                .map(|id| id.to_string())
857                .unwrap_or_default(),
858            current_state: preview
859                .current_state
860                .map(|state| AgentTimelineStateSummary {
861                    state_id: state.as_bytes().to_vec(),
862                    display_id: state.to_string_full(),
863                    source_step_id: preview
864                        .current_step_id
865                        .as_ref()
866                        .map(|id| id.to_string())
867                        .unwrap_or_default(),
868                    payload: current_step_for_state.and_then(|step| step.payload),
869                }),
870            branch_count: view.branch_count(&preview.thread) as u32,
871            step_count: view.step_count(&preview.thread) as u32,
872        }),
873        current_step,
874        target_branch_id: preview.target.branch_id.to_string(),
875        target_step_id: preview
876            .target
877            .step_id
878            .as_ref()
879            .map(|step_id| step_id.to_string())
880            .unwrap_or_default(),
881        target_state: preview.target.state.as_bytes().to_vec(),
882        target_state_display_id: preview.target.state.to_string_full(),
883        target_step: target_step.map(step_summary_to_proto),
884        current_state: preview
885            .current_state
886            .map(|state| state.as_bytes().to_vec())
887            .unwrap_or_default(),
888        current_state_display_id: preview
889            .current_state
890            .map(|state| state.to_string_full())
891            .unwrap_or_default(),
892        changed_paths: preview.changed_paths.clone(),
893        worktree_dirty,
894        worktree_dirty_known: preview.worktree_status.is_some(),
895        blockers,
896        materialization_supported,
897        checkout_state: preview
898            .checkout_state
899            .map(|state| state.as_bytes().to_vec())
900            .unwrap_or_default(),
901        checkout_state_display_id: preview
902            .checkout_state
903            .map(|state| state.to_string_full())
904            .unwrap_or_default(),
905        can_materialize: preview.can_materialize(),
906    }
907}
908
909fn timeline_navigation_snapshot_to_proto(
910    view: &TimelineView,
911    snapshot: RepoTimelineNavigationSnapshot,
912) -> WireTimelineNavigationSnapshot {
913    WireTimelineNavigationSnapshot {
914        thread: snapshot.thread.clone(),
915        cursor: Some(timeline_navigation_cursor_to_proto(snapshot.cursor)),
916        branches: snapshot
917            .branches
918            .into_iter()
919            .map(timeline_navigation_branch_to_proto)
920            .collect(),
921        steps: snapshot
922            .steps
923            .into_iter()
924            .map(timeline_navigation_step_to_proto)
925            .collect(),
926        active_branch_path: snapshot
927            .active_branch_path
928            .iter()
929            .map(ToString::to_string)
930            .collect(),
931        actions: Some(WireTimelineNavigationActionAvailability {
932            can_undo: snapshot.actions.can_undo,
933            can_redo: snapshot.actions.can_redo,
934        }),
935        recovery: snapshot.recovery.map(timeline_navigation_recovery_to_proto),
936        status: Some(status_for_thread(view, &snapshot.thread)),
937    }
938}
939
940fn timeline_navigation_cursor_to_proto(
941    cursor: repo::TimelineNavigationCursor,
942) -> WireTimelineNavigationCursor {
943    WireTimelineNavigationCursor {
944        branch_id: cursor
945            .branch_id
946            .as_ref()
947            .map(ToString::to_string)
948            .unwrap_or_default(),
949        step_id: cursor
950            .step_id
951            .as_ref()
952            .map(ToString::to_string)
953            .unwrap_or_default(),
954        state: cursor
955            .state
956            .map(|state| state.as_bytes().to_vec())
957            .unwrap_or_default(),
958        state_display_id: cursor
959            .state
960            .map(|state| state.to_string_full())
961            .unwrap_or_default(),
962    }
963}
964
965fn timeline_navigation_branch_to_proto(
966    branch: repo::TimelineNavigationBranch,
967) -> WireTimelineNavigationBranch {
968    WireTimelineNavigationBranch {
969        branch_id: branch.branch_id.to_string(),
970        parent_branch_id: branch
971            .parent_branch_id
972            .as_ref()
973            .map(ToString::to_string)
974            .unwrap_or_default(),
975        forked_from_step_id: branch
976            .forked_from_step_id
977            .as_ref()
978            .map(ToString::to_string)
979            .unwrap_or_default(),
980        forked_from_state: branch
981            .forked_from_state
982            .map(|state| state.as_bytes().to_vec())
983            .unwrap_or_default(),
984        forked_from_state_display_id: branch
985            .forked_from_state
986            .map(|state| state.to_string_full())
987            .unwrap_or_default(),
988        reason: branch
989            .reason
990            .as_ref()
991            .map(branch_reason_to_wire)
992            .unwrap_or_default()
993            .to_string(),
994        created_at_ms: branch.created_at_ms.unwrap_or_default(),
995        operation_ids: branch
996            .operation_ids
997            .iter()
998            .map(|id| id.as_bytes().to_vec())
999            .collect(),
1000        operation_display_ids: branch
1001            .operation_ids
1002            .iter()
1003            .map(TimelineOperationId::to_string_full)
1004            .collect(),
1005        step_ids: branch.step_ids.iter().map(ToString::to_string).collect(),
1006        is_active: branch.is_active,
1007        is_on_active_path: branch.is_on_active_path,
1008    }
1009}
1010
1011fn timeline_navigation_step_to_proto(
1012    step: RepoTimelineNavigationStep,
1013) -> WireTimelineNavigationStep {
1014    let cursor_state = step.cursor_state;
1015    WireTimelineNavigationStep {
1016        step: Some(AgentTimelineStepSummary {
1017            thread: step.thread,
1018            step_id: step.step_id.to_string(),
1019            branch_id: step.branch_id.to_string(),
1020            parent_step_id: step
1021                .parent_step_id
1022                .as_ref()
1023                .map(ToString::to_string)
1024                .unwrap_or_default(),
1025            native: step.native.map(native_to_proto),
1026            tool_name: step.tool_name.unwrap_or_default(),
1027            status: step
1028                .status
1029                .as_ref()
1030                .map(tool_call_status_to_wire)
1031                .unwrap_or_default()
1032                .to_string(),
1033            changed: step.changed.unwrap_or(false),
1034            touched_paths: step.touched_paths,
1035            before_state: step
1036                .before_state
1037                .map(|state| state.as_bytes().to_vec())
1038                .unwrap_or_default(),
1039            after_state: step
1040                .after_state
1041                .map(|state| state.as_bytes().to_vec())
1042                .unwrap_or_default(),
1043            capture_state: step
1044                .capture_state
1045                .map(|state| state.as_bytes().to_vec())
1046                .unwrap_or_default(),
1047            payload: timeline_navigation_payload_to_proto(step.payload_summary, step.payload_hash),
1048            labels: step.labels.iter().map(label_to_wire).collect(),
1049            started_at_ms: step.started_at_ms.unwrap_or_default(),
1050            finished_at_ms: step.finished_at_ms.unwrap_or_default(),
1051            operation_ids: step
1052                .operation_ids
1053                .iter()
1054                .map(|id| id.as_bytes().to_vec())
1055                .collect(),
1056            operation_display_ids: step
1057                .operation_ids
1058                .iter()
1059                .map(TimelineOperationId::to_string_full)
1060                .collect(),
1061            capture_oplog_batch_id: step.capture_oplog_batch_id,
1062        }),
1063        cursor_state: cursor_state
1064            .map(|state| state.as_bytes().to_vec())
1065            .unwrap_or_default(),
1066        cursor_state_display_id: cursor_state
1067            .map(|state| state.to_string_full())
1068            .unwrap_or_default(),
1069        is_current: step.is_current,
1070        is_on_active_branch_path: step.is_on_active_branch_path,
1071        can_seek: step.can_seek,
1072        can_fork: step.can_fork,
1073        can_reset: step.can_reset,
1074        can_materialize: step.can_materialize,
1075        has_boundary_warning: step.has_boundary_warning,
1076    }
1077}
1078
1079fn timeline_navigation_payload_to_proto(
1080    summary: Option<String>,
1081    hash: Option<ContentHash>,
1082) -> Option<AgentTimelineToolPayload> {
1083    if summary.is_none() && hash.is_none() {
1084        return None;
1085    }
1086    Some(AgentTimelineToolPayload {
1087        summary: summary.unwrap_or_default(),
1088        hash: hash
1089            .map(|hash| hash.as_bytes().to_vec())
1090            .unwrap_or_default(),
1091    })
1092}
1093
1094fn timeline_navigation_recovery_to_proto(
1095    recovery: repo::TimelineNavigationRecovery,
1096) -> WireTimelineNavigationRecovery {
1097    WireTimelineNavigationRecovery {
1098        status: timeline_navigation_recovery_status_to_proto(recovery.status) as i32,
1099        thread: recovery.thread,
1100        branch_id: recovery.branch_id.to_string(),
1101        from_step_id: recovery
1102            .from_step_id
1103            .as_ref()
1104            .map(ToString::to_string)
1105            .unwrap_or_default(),
1106        to_step_id: recovery
1107            .to_step_id
1108            .as_ref()
1109            .map(ToString::to_string)
1110            .unwrap_or_default(),
1111        from_state: recovery.from_state.as_bytes().to_vec(),
1112        from_state_display_id: recovery.from_state.to_string_full(),
1113        to_state: recovery.to_state.as_bytes().to_vec(),
1114        to_state_display_id: recovery.to_state.to_string_full(),
1115        reason: cursor_reason_to_wire(&recovery.reason).to_string(),
1116        moved_at_ms: recovery.moved_at_ms,
1117        checkout_state: recovery
1118            .checkout_state
1119            .map(|state| state.as_bytes().to_vec())
1120            .unwrap_or_default(),
1121        checkout_state_display_id: recovery
1122            .checkout_state
1123            .map(|state| state.to_string_full())
1124            .unwrap_or_default(),
1125        checkout_state_known: recovery.checkout_state.is_some(),
1126    }
1127}
1128
1129fn timeline_navigation_recovery_status_to_proto(
1130    status: RepoTimelineNavigationRecoveryStatus,
1131) -> WireTimelineNavigationRecoveryStatus {
1132    match status {
1133        RepoTimelineNavigationRecoveryStatus::PendingCursorRecord => {
1134            WireTimelineNavigationRecoveryStatus::PendingCursorRecord
1135        }
1136        RepoTimelineNavigationRecoveryStatus::Blocked => {
1137            WireTimelineNavigationRecoveryStatus::Blocked
1138        }
1139        RepoTimelineNavigationRecoveryStatus::AlreadyApplied => {
1140            WireTimelineNavigationRecoveryStatus::AlreadyApplied
1141        }
1142    }
1143}
1144
1145fn repo_materialize_mode(value: i32) -> RepoTimelineMaterializeMode {
1146    match TimelineMaterializeMode::try_from(value).unwrap_or(TimelineMaterializeMode::Unspecified) {
1147        TimelineMaterializeMode::Unspecified | TimelineMaterializeMode::FailIfDirty => {
1148            RepoTimelineMaterializeMode::FailIfDirty
1149        }
1150        TimelineMaterializeMode::CaptureCurrentThenSeek => {
1151            RepoTimelineMaterializeMode::CaptureCurrentThenSeek
1152        }
1153    }
1154}
1155
1156fn repo_materialize_mode_to_wire(mode: RepoTimelineMaterializeMode) -> &'static str {
1157    match mode {
1158        RepoTimelineMaterializeMode::FailIfDirty => "fail-if-dirty",
1159        RepoTimelineMaterializeMode::CaptureCurrentThenSeek => "capture-current-then-seek",
1160    }
1161}
1162
1163fn repo_materialize_mode_to_wire_code(
1164    mode: RepoTimelineMaterializeMode,
1165) -> TimelineMaterializeMode {
1166    match mode {
1167        RepoTimelineMaterializeMode::FailIfDirty => TimelineMaterializeMode::FailIfDirty,
1168        RepoTimelineMaterializeMode::CaptureCurrentThenSeek => {
1169            TimelineMaterializeMode::CaptureCurrentThenSeek
1170        }
1171    }
1172}
1173
1174fn materialize_status_to_wire_code(
1175    status: &TimelineMaterializeStatus,
1176) -> WireTimelineMaterializeStatus {
1177    match status {
1178        TimelineMaterializeStatus::Materialized => WireTimelineMaterializeStatus::Materialized,
1179        TimelineMaterializeStatus::AlreadyAtTarget => {
1180            WireTimelineMaterializeStatus::AlreadyAtTarget
1181        }
1182        TimelineMaterializeStatus::Refused => WireTimelineMaterializeStatus::Refused,
1183        TimelineMaterializeStatus::Unsupported => WireTimelineMaterializeStatus::Unsupported,
1184        TimelineMaterializeStatus::RecoveryBlocked => {
1185            WireTimelineMaterializeStatus::RecoveryBlocked
1186        }
1187    }
1188}
1189
1190fn recovery_status_to_wire_code(
1191    recovery: &TimelineMaterializationRecoveryOutcome,
1192) -> WireTimelineMaterializationRecoveryStatus {
1193    match recovery.status {
1194        TimelineMaterializationRecoveryStatus::NoPending => {
1195            WireTimelineMaterializationRecoveryStatus::NoPending
1196        }
1197        TimelineMaterializationRecoveryStatus::CursorRecorded => {
1198            WireTimelineMaterializationRecoveryStatus::CursorRecorded
1199        }
1200        TimelineMaterializationRecoveryStatus::AlreadyApplied => {
1201            WireTimelineMaterializationRecoveryStatus::AlreadyApplied
1202        }
1203        TimelineMaterializationRecoveryStatus::Blocked => {
1204            WireTimelineMaterializationRecoveryStatus::Blocked
1205        }
1206    }
1207}
1208
1209fn recovery_blockers_to_wire_details(
1210    recovery: &TimelineMaterializationRecoveryOutcome,
1211) -> Vec<WireTimelineMaterializationRecoveryBlocker> {
1212    recovery
1213        .blocker
1214        .iter()
1215        .map(recovery_blocker_to_wire_detail)
1216        .collect()
1217}
1218
1219fn recovery_blocker_to_wire(blocker: &TimelineMaterializationRecoveryBlocker) -> String {
1220    match blocker {
1221        TimelineMaterializationRecoveryBlocker::CheckoutNotAtTarget {
1222            checkout_state,
1223            target_state,
1224        } => {
1225            let checkout = checkout_state
1226                .map(|state| state.to_string_full())
1227                .unwrap_or_else(|| "unknown".to_string());
1228            format!(
1229                "pending timeline materialization target is {}, but checkout is at {}",
1230                target_state.to_string_full(),
1231                checkout
1232            )
1233        }
1234    }
1235}
1236
1237fn recovery_blocker_to_wire_detail(
1238    blocker: &TimelineMaterializationRecoveryBlocker,
1239) -> WireTimelineMaterializationRecoveryBlocker {
1240    match blocker {
1241        TimelineMaterializationRecoveryBlocker::CheckoutNotAtTarget {
1242            checkout_state,
1243            target_state,
1244        } => WireTimelineMaterializationRecoveryBlocker {
1245            kind: TimelineMaterializationRecoveryBlockerKind::CheckoutNotAtTarget as i32,
1246            message: recovery_blocker_to_wire(blocker),
1247            checkout_state: checkout_state
1248                .as_ref()
1249                .map(|state| state.as_bytes().to_vec())
1250                .unwrap_or_default(),
1251            checkout_state_display_id: checkout_state
1252                .as_ref()
1253                .map(|state| state.to_string_full())
1254                .unwrap_or_default(),
1255            checkout_state_known: checkout_state.is_some(),
1256            target_state: target_state.as_bytes().to_vec(),
1257            target_state_display_id: target_state.to_string_full(),
1258        },
1259    }
1260}
1261
1262fn blockers_to_wire_details(
1263    blockers: &[TimelineMaterializationBlocker],
1264) -> Vec<WireTimelineMaterializationBlocker> {
1265    blockers.iter().map(blocker_to_wire_detail).collect()
1266}
1267
1268fn blocker_to_wire(blocker: &TimelineMaterializationBlocker) -> String {
1269    match blocker {
1270        TimelineMaterializationBlocker::UnsupportedMode(mode) => format!(
1271            "timeline materialization mode '{}' is not supported",
1272            repo_materialize_mode_to_wire(*mode)
1273        ),
1274        TimelineMaterializationBlocker::DirtyWorktree { paths } if paths.is_empty() => {
1275            "worktree has local changes".to_string()
1276        }
1277        TimelineMaterializationBlocker::DirtyWorktree { paths } => {
1278            format!("worktree has local changes: {}", paths.join(", "))
1279        }
1280        TimelineMaterializationBlocker::CheckoutStateUnknown => {
1281            "checkout state is unknown".to_string()
1282        }
1283        TimelineMaterializationBlocker::MissingTree(state) => {
1284            format!("tree for state {} is unavailable", state.to_string_full())
1285        }
1286    }
1287}
1288
1289fn blocker_to_wire_detail(
1290    blocker: &TimelineMaterializationBlocker,
1291) -> WireTimelineMaterializationBlocker {
1292    match blocker {
1293        TimelineMaterializationBlocker::UnsupportedMode(mode) => {
1294            WireTimelineMaterializationBlocker {
1295                kind: TimelineMaterializationBlockerKind::UnsupportedMode as i32,
1296                message: blocker_to_wire(blocker),
1297                unsupported_mode: repo_materialize_mode_to_wire_code(*mode) as i32,
1298                paths: Vec::new(),
1299                state: Vec::new(),
1300                state_display_id: String::new(),
1301            }
1302        }
1303        TimelineMaterializationBlocker::DirtyWorktree { paths } => {
1304            WireTimelineMaterializationBlocker {
1305                kind: TimelineMaterializationBlockerKind::DirtyWorktree as i32,
1306                message: blocker_to_wire(blocker),
1307                unsupported_mode: TimelineMaterializeMode::Unspecified as i32,
1308                paths: paths.clone(),
1309                state: Vec::new(),
1310                state_display_id: String::new(),
1311            }
1312        }
1313        TimelineMaterializationBlocker::CheckoutStateUnknown => {
1314            WireTimelineMaterializationBlocker {
1315                kind: TimelineMaterializationBlockerKind::CheckoutStateUnknown as i32,
1316                message: blocker_to_wire(blocker),
1317                unsupported_mode: TimelineMaterializeMode::Unspecified as i32,
1318                paths: Vec::new(),
1319                state: Vec::new(),
1320                state_display_id: String::new(),
1321            }
1322        }
1323        TimelineMaterializationBlocker::MissingTree(state) => WireTimelineMaterializationBlocker {
1324            kind: TimelineMaterializationBlockerKind::MissingTree as i32,
1325            message: blocker_to_wire(blocker),
1326            unsupported_mode: TimelineMaterializeMode::Unspecified as i32,
1327            paths: Vec::new(),
1328            state: state.as_bytes().to_vec(),
1329            state_display_id: state.to_string_full(),
1330        },
1331    }
1332}
1333
1334fn write_cursor_move(
1335    store: &TimelineStore,
1336    target: &TimelineSeekTarget,
1337    view: &TimelineView,
1338    reason: TimelineCursorMoveReason,
1339) -> Result<TimelineCursorMoveResponse, Status> {
1340    let status = view.status(&target.thread);
1341    let from_step_id = status.and_then(|status| status.current_step_id.clone());
1342    let from_state = status
1343        .and_then(|status| status.current_state)
1344        .unwrap_or(target.state);
1345    let id = store
1346        .record_cursor_move(TimelineCursorMoveRecord {
1347            thread: target.thread.clone(),
1348            branch_id: target.branch_id.clone(),
1349            from_step_id,
1350            to_step_id: target.step_id.clone(),
1351            from_state,
1352            to_state: target.state,
1353            reason,
1354            moved_at_ms: now_ms(),
1355            labels: Vec::new(),
1356        })
1357        .map_err(to_status)?;
1358    let record = record_from_store(store, id)?;
1359    let view = TimelineView::rebuild(store).map_err(to_status)?;
1360    Ok(TimelineCursorMoveResponse {
1361        status: Some(status_for_thread(&view, &target.thread)),
1362        operation: Some(record),
1363    })
1364}
1365
1366fn write_timeline_envelope(
1367    store: &TimelineStore,
1368    envelope: TimelineOperationEnvelope,
1369) -> Result<AgentTimelineOperationRecord, Status> {
1370    let bytes = envelope
1371        .encode()
1372        .map_err(|err| Status::invalid_argument(err.to_string()))?;
1373    let id = store.write_operation_bytes(&bytes).map_err(to_status)?;
1374    record_from_envelope(id, envelope, bytes)
1375}
1376
1377fn parse_seek_reason(value: &str) -> Result<TimelineCursorMoveReason, Status> {
1378    if value.is_empty() || value == "seek-step" {
1379        return Ok(TimelineCursorMoveReason::SeekToolCall);
1380    }
1381    parse_cursor_reason(value)
1382}
1383
1384fn parse_branch_reason_or_default(value: &str) -> Result<TimelineBranchReason, Status> {
1385    if value.is_empty() {
1386        return Ok(TimelineBranchReason::ExplicitFork);
1387    }
1388    parse_branch_reason(value)
1389}
1390
1391fn now_ms() -> i64 {
1392    SystemTime::now()
1393        .duration_since(UNIX_EPOCH)
1394        .map(|duration| duration.as_millis() as i64)
1395        .unwrap_or_default()
1396}
1397
1398fn open_timeline_store_and_view(
1399    inner: &GrpcLocalService,
1400) -> Result<(TimelineStore, TimelineView), Status> {
1401    let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
1402    let view = TimelineView::rebuild(&store).map_err(to_status)?;
1403    Ok((store, view))
1404}
1405
1406fn record_from_store(
1407    store: &TimelineStore,
1408    id: TimelineOperationId,
1409) -> Result<AgentTimelineOperationRecord, Status> {
1410    let bytes = store
1411        .read_operation_bytes(&id)
1412        .map_err(to_status)?
1413        .ok_or_else(|| {
1414            Status::internal(format!(
1415                "timeline operation {} was not persisted",
1416                id.short()
1417            ))
1418        })?;
1419    let envelope = TimelineOperationEnvelope::decode(&bytes)
1420        .map_err(|err| Status::internal(format!("decode stored timeline operation: {err}")))?;
1421    record_from_envelope(id, envelope, bytes)
1422}
1423
1424fn status_for_thread(view: &TimelineView, thread: &str) -> AgentTimelineStatus {
1425    let status = view.status(thread);
1426    AgentTimelineStatus {
1427        thread: thread.to_string(),
1428        current_branch_id: status
1429            .and_then(|status| status.current_branch_id.as_ref())
1430            .map(|id| id.to_string())
1431            .unwrap_or_default(),
1432        current_step_id: status
1433            .and_then(|status| status.current_step_id.as_ref())
1434            .map(|id| id.to_string())
1435            .unwrap_or_default(),
1436        current_state: status.and_then(|status| state_summary_for_status(view, status)),
1437        branch_count: view.branch_count(thread) as u32,
1438        step_count: view.step_count(thread) as u32,
1439    }
1440}
1441
1442fn state_summary_for_status(
1443    view: &TimelineView,
1444    status: &TimelineThreadStatus,
1445) -> Option<AgentTimelineStateSummary> {
1446    let state = status.current_state?;
1447    let step = status
1448        .current_step_id
1449        .as_ref()
1450        .and_then(|step_id| view.step(&status.thread, step_id));
1451    Some(AgentTimelineStateSummary {
1452        state_id: state.as_bytes().to_vec(),
1453        display_id: state.to_string_full(),
1454        source_step_id: status
1455            .current_step_id
1456            .as_ref()
1457            .map(|id| id.to_string())
1458            .unwrap_or_default(),
1459        payload: step.and_then(step_payload_to_proto),
1460    })
1461}
1462
1463fn step_summary_to_proto(step: &TimelineStepSummary) -> AgentTimelineStepSummary {
1464    AgentTimelineStepSummary {
1465        thread: step.thread.clone(),
1466        step_id: step.step_id.to_string(),
1467        branch_id: step.branch_id.to_string(),
1468        parent_step_id: step
1469            .parent_step_id
1470            .as_ref()
1471            .map(|id| id.to_string())
1472            .unwrap_or_default(),
1473        native: step.native.clone().map(native_to_proto),
1474        tool_name: step.tool_name.clone().unwrap_or_default(),
1475        status: step
1476            .status
1477            .as_ref()
1478            .map(tool_call_status_to_wire)
1479            .unwrap_or_default()
1480            .to_string(),
1481        changed: step.changed.unwrap_or(false),
1482        touched_paths: step.touched_paths.clone(),
1483        before_state: step
1484            .before_state
1485            .map(|state| state.as_bytes().to_vec())
1486            .unwrap_or_default(),
1487        after_state: step
1488            .after_state
1489            .map(|state| state.as_bytes().to_vec())
1490            .unwrap_or_default(),
1491        capture_state: step
1492            .capture_state
1493            .map(|state| state.as_bytes().to_vec())
1494            .unwrap_or_default(),
1495        payload: step_payload_to_proto(step),
1496        labels: step.labels.iter().map(label_to_wire).collect(),
1497        started_at_ms: step.started_at_ms.unwrap_or_default(),
1498        finished_at_ms: step.finished_at_ms.unwrap_or_default(),
1499        operation_ids: step
1500            .operation_ids
1501            .iter()
1502            .map(|id| id.as_bytes().to_vec())
1503            .collect(),
1504        operation_display_ids: step
1505            .operation_ids
1506            .iter()
1507            .map(TimelineOperationId::to_string_full)
1508            .collect(),
1509        capture_oplog_batch_id: step.capture_oplog_batch_id,
1510    }
1511}
1512
1513fn step_payload_to_proto(step: &TimelineStepSummary) -> Option<AgentTimelineToolPayload> {
1514    if step.payload_summary.is_none() && step.payload_hash.is_none() {
1515        return None;
1516    }
1517    Some(AgentTimelineToolPayload {
1518        summary: step.payload_summary.clone().unwrap_or_default(),
1519        hash: step
1520            .payload_hash
1521            .map(|hash| hash.as_bytes().to_vec())
1522            .unwrap_or_default(),
1523    })
1524}
1525
1526fn native_key_from_parts(req: &SeekTimelineToNativeToolCallRequest) -> TimelineNativeToolKey {
1527    TimelineNativeToolKey {
1528        harness: req.harness.clone(),
1529        session_id: non_empty(req.session_id.clone()),
1530        message_id: non_empty(req.message_id.clone()),
1531        tool_call_id: req.tool_call_id.clone(),
1532    }
1533}
1534
1535fn native_key_from_resolve_request(req: &ResolveNativeToolCallRequest) -> TimelineNativeToolKey {
1536    TimelineNativeToolKey {
1537        harness: req.harness.clone(),
1538        session_id: non_empty(req.session_id.clone()),
1539        message_id: non_empty(req.message_id.clone()),
1540        tool_call_id: req.tool_call_id.clone(),
1541    }
1542}
1543
1544fn draft_to_envelope(
1545    draft: AgentTimelineOperationDraft,
1546) -> Result<TimelineOperationEnvelope, Status> {
1547    let labels = draft
1548        .labels
1549        .iter()
1550        .map(|label| parse_label(label))
1551        .collect::<Result<Vec<_>, _>>()?;
1552    let body = match draft
1553        .body
1554        .ok_or_else(|| Status::invalid_argument("operation body is required"))?
1555    {
1556        agent_timeline_operation_draft::Body::ToolCallStarted(body) => {
1557            TimelineOperationBodyV1::ToolCallStarted(tool_call_started_from_proto(body)?)
1558        }
1559        agent_timeline_operation_draft::Body::ToolCallFinished(body) => {
1560            TimelineOperationBodyV1::ToolCallFinished(tool_call_finished_from_proto(body)?)
1561        }
1562        agent_timeline_operation_draft::Body::CursorMoved(body) => {
1563            TimelineOperationBodyV1::CursorMoved(cursor_moved_from_proto(body)?)
1564        }
1565        agent_timeline_operation_draft::Body::BranchCreated(body) => {
1566            TimelineOperationBodyV1::BranchCreated(branch_created_from_proto(body)?)
1567        }
1568    };
1569    Ok(TimelineOperationEnvelope::new(body, labels))
1570}
1571
1572fn record_from_envelope(
1573    id: TimelineOperationId,
1574    envelope: TimelineOperationEnvelope,
1575    bytes: Vec<u8>,
1576) -> Result<AgentTimelineOperationRecord, Status> {
1577    let body = match envelope.body {
1578        TimelineOperationBodyV1::ToolCallStarted(body) => {
1579            agent_timeline_operation_record::Body::ToolCallStarted(tool_call_started_to_proto(body))
1580        }
1581        TimelineOperationBodyV1::ToolCallFinished(body) => {
1582            agent_timeline_operation_record::Body::ToolCallFinished(tool_call_finished_to_proto(
1583                body,
1584            ))
1585        }
1586        TimelineOperationBodyV1::CursorMoved(body) => {
1587            agent_timeline_operation_record::Body::CursorMoved(cursor_moved_to_proto(body))
1588        }
1589        TimelineOperationBodyV1::BranchCreated(body) => {
1590            agent_timeline_operation_record::Body::BranchCreated(branch_created_to_proto(body))
1591        }
1592    };
1593    Ok(AgentTimelineOperationRecord {
1594        operation_id: id.as_bytes().to_vec(),
1595        display_id: id.to_string_full(),
1596        schema_version: envelope.schema_version.into(),
1597        kind: envelope.kind.as_str().to_string(),
1598        labels: envelope.labels.iter().map(label_to_wire).collect(),
1599        body: Some(body),
1600        envelope: bytes,
1601    })
1602}
1603
1604fn tool_call_started_from_proto(
1605    body: AgentTimelineToolCallStarted,
1606) -> Result<ToolCallStartedV1, Status> {
1607    Ok(ToolCallStartedV1 {
1608        thread: body.thread,
1609        step_id: TimelineStepId::new(body.step_id),
1610        branch_id: TimelineBranchId::new(body.branch_id),
1611        parent_step_id: optional_step_id(body.parent_step_id),
1612        native: native_from_proto(body.native)?,
1613        tool_name: body.tool_name,
1614        before_state: change_id_from_bytes(&body.before_state, "before_state")?,
1615        payload: payload_from_proto(body.payload)?,
1616        started_at_ms: body.started_at_ms,
1617    })
1618}
1619
1620fn tool_call_finished_from_proto(
1621    body: AgentTimelineToolCallFinished,
1622) -> Result<ToolCallFinishedV1, Status> {
1623    Ok(ToolCallFinishedV1 {
1624        thread: body.thread,
1625        step_id: TimelineStepId::new(body.step_id),
1626        branch_id: TimelineBranchId::new(body.branch_id),
1627        native: native_from_proto(body.native)?,
1628        status: parse_tool_call_status(&body.status)?,
1629        before_state: change_id_from_bytes(&body.before_state, "before_state")?,
1630        after_state: change_id_from_bytes(&body.after_state, "after_state")?,
1631        capture_state: optional_change_id(body.capture_state, "capture_state")?,
1632        capture_oplog_batch_id: body.capture_oplog_batch_id,
1633        changed: body.changed,
1634        touched_paths: body.touched_paths,
1635        payload: payload_from_proto(body.payload)?,
1636        finished_at_ms: body.finished_at_ms,
1637    })
1638}
1639
1640fn cursor_moved_from_proto(body: AgentTimelineCursorMoved) -> Result<CursorMovedV1, Status> {
1641    Ok(CursorMovedV1 {
1642        thread: body.thread,
1643        branch_id: TimelineBranchId::new(body.branch_id),
1644        from_step_id: optional_step_id(body.from_step_id),
1645        to_step_id: optional_step_id(body.to_step_id),
1646        from_state: change_id_from_bytes(&body.from_state, "from_state")?,
1647        to_state: change_id_from_bytes(&body.to_state, "to_state")?,
1648        reason: parse_cursor_reason(&body.reason)?,
1649        moved_at_ms: body.moved_at_ms,
1650    })
1651}
1652
1653fn branch_created_from_proto(body: AgentTimelineBranchCreated) -> Result<BranchCreatedV1, Status> {
1654    Ok(BranchCreatedV1 {
1655        thread: body.thread,
1656        branch_id: TimelineBranchId::new(body.branch_id),
1657        parent_branch_id: optional_branch_id(body.parent_branch_id),
1658        from_step_id: optional_step_id(body.from_step_id),
1659        from_state: change_id_from_bytes(&body.from_state, "from_state")?,
1660        reason: parse_branch_reason(&body.reason)?,
1661        created_at_ms: body.created_at_ms,
1662    })
1663}
1664
1665fn tool_call_started_to_proto(body: ToolCallStartedV1) -> AgentTimelineToolCallStarted {
1666    AgentTimelineToolCallStarted {
1667        thread: body.thread,
1668        step_id: body.step_id.to_string(),
1669        branch_id: body.branch_id.to_string(),
1670        parent_step_id: body
1671            .parent_step_id
1672            .map(|id| id.to_string())
1673            .unwrap_or_default(),
1674        native: Some(native_to_proto(body.native)),
1675        tool_name: body.tool_name,
1676        before_state: body.before_state.as_bytes().to_vec(),
1677        payload: payload_to_proto(body.payload),
1678        started_at_ms: body.started_at_ms,
1679    }
1680}
1681
1682fn tool_call_finished_to_proto(body: ToolCallFinishedV1) -> AgentTimelineToolCallFinished {
1683    AgentTimelineToolCallFinished {
1684        thread: body.thread,
1685        step_id: body.step_id.to_string(),
1686        branch_id: body.branch_id.to_string(),
1687        native: Some(native_to_proto(body.native)),
1688        status: tool_call_status_to_wire(&body.status).to_string(),
1689        before_state: body.before_state.as_bytes().to_vec(),
1690        after_state: body.after_state.as_bytes().to_vec(),
1691        capture_state: body
1692            .capture_state
1693            .map(|state| state.as_bytes().to_vec())
1694            .unwrap_or_default(),
1695        capture_oplog_batch_id: body.capture_oplog_batch_id,
1696        changed: body.changed,
1697        touched_paths: body.touched_paths,
1698        payload: payload_to_proto(body.payload),
1699        finished_at_ms: body.finished_at_ms,
1700    }
1701}
1702
1703fn cursor_moved_to_proto(body: CursorMovedV1) -> AgentTimelineCursorMoved {
1704    AgentTimelineCursorMoved {
1705        thread: body.thread,
1706        branch_id: body.branch_id.to_string(),
1707        from_step_id: body
1708            .from_step_id
1709            .map(|id| id.to_string())
1710            .unwrap_or_default(),
1711        to_step_id: body.to_step_id.map(|id| id.to_string()).unwrap_or_default(),
1712        from_state: body.from_state.as_bytes().to_vec(),
1713        to_state: body.to_state.as_bytes().to_vec(),
1714        reason: cursor_reason_to_wire(&body.reason).to_string(),
1715        moved_at_ms: body.moved_at_ms,
1716    }
1717}
1718
1719fn branch_created_to_proto(body: BranchCreatedV1) -> AgentTimelineBranchCreated {
1720    AgentTimelineBranchCreated {
1721        thread: body.thread,
1722        branch_id: body.branch_id.to_string(),
1723        parent_branch_id: body
1724            .parent_branch_id
1725            .map(|id| id.to_string())
1726            .unwrap_or_default(),
1727        from_step_id: body
1728            .from_step_id
1729            .map(|id| id.to_string())
1730            .unwrap_or_default(),
1731        from_state: body.from_state.as_bytes().to_vec(),
1732        reason: branch_reason_to_wire(&body.reason).to_string(),
1733        created_at_ms: body.created_at_ms,
1734    }
1735}
1736
1737fn optional_step_id(value: String) -> Option<TimelineStepId> {
1738    (!value.is_empty()).then(|| TimelineStepId::new(value))
1739}
1740
1741fn optional_branch_id(value: String) -> Option<TimelineBranchId> {
1742    (!value.is_empty()).then(|| TimelineBranchId::new(value))
1743}
1744
1745fn native_from_proto(
1746    native: Option<AgentTimelineNativeToolCall>,
1747) -> Result<NativeToolCallRefV1, Status> {
1748    let native = native.ok_or_else(|| Status::invalid_argument("native tool call is required"))?;
1749    if native.harness.trim().is_empty() {
1750        return Err(Status::invalid_argument("native.harness must not be empty"));
1751    }
1752    if native.tool_call_id.trim().is_empty() {
1753        return Err(Status::invalid_argument(
1754            "native.tool_call_id must not be empty",
1755        ));
1756    }
1757    Ok(NativeToolCallRefV1 {
1758        harness: native.harness,
1759        session_id: non_empty(native.session_id),
1760        message_id: non_empty(native.message_id),
1761        tool_call_id: native.tool_call_id,
1762    })
1763}
1764
1765fn native_to_proto(native: NativeToolCallRefV1) -> AgentTimelineNativeToolCall {
1766    AgentTimelineNativeToolCall {
1767        harness: native.harness,
1768        session_id: native.session_id.unwrap_or_default(),
1769        message_id: native.message_id.unwrap_or_default(),
1770        tool_call_id: native.tool_call_id,
1771    }
1772}
1773
1774fn payload_from_proto(
1775    payload: Option<AgentTimelineToolPayload>,
1776) -> Result<Option<TimelineToolPayloadMetadata>, Status> {
1777    let Some(payload) = payload else {
1778        return Ok(None);
1779    };
1780    Ok(Some(TimelineToolPayloadMetadata {
1781        summary: non_empty(payload.summary),
1782        hash: optional_content_hash(payload.hash, "payload.hash")?,
1783    }))
1784}
1785
1786fn payload_to_proto(
1787    payload: Option<TimelineToolPayloadMetadata>,
1788) -> Option<AgentTimelineToolPayload> {
1789    payload.map(|payload| AgentTimelineToolPayload {
1790        summary: payload.summary.unwrap_or_default(),
1791        hash: payload
1792            .hash
1793            .map(|hash| hash.as_bytes().to_vec())
1794            .unwrap_or_default(),
1795    })
1796}
1797
1798fn change_id_from_bytes(bytes: &[u8], field: &str) -> Result<ChangeId, Status> {
1799    ChangeId::try_from_slice(bytes)
1800        .map_err(|err| Status::invalid_argument(format!("invalid {field}: {err}")))
1801}
1802
1803fn optional_change_id(bytes: Vec<u8>, field: &str) -> Result<Option<ChangeId>, Status> {
1804    if bytes.is_empty() {
1805        return Ok(None);
1806    }
1807    change_id_from_bytes(&bytes, field).map(Some)
1808}
1809
1810fn optional_content_hash(bytes: Vec<u8>, field: &str) -> Result<Option<ContentHash>, Status> {
1811    if bytes.is_empty() {
1812        return Ok(None);
1813    }
1814    if bytes.len() != 32 {
1815        return Err(Status::invalid_argument(format!(
1816            "invalid {field}: expected 32 bytes"
1817        )));
1818    }
1819    let mut arr = [0u8; 32];
1820    arr.copy_from_slice(&bytes);
1821    Ok(Some(ContentHash::from_bytes(arr)))
1822}
1823
1824fn non_empty(value: String) -> Option<String> {
1825    (!value.is_empty()).then_some(value)
1826}
1827
1828fn parse_label(value: &str) -> Result<TimelineLabel, Status> {
1829    match value {
1830        "repo-reversible" => Ok(TimelineLabel::RepoReversible),
1831        "external-side-effects-unknown" => Ok(TimelineLabel::ExternalSideEffectsUnknown),
1832        "ignored-path-touched" => Ok(TimelineLabel::IgnoredPathTouched),
1833        "outside-repo-touched" => Ok(TimelineLabel::OutsideRepoTouched),
1834        "purge-boundary" => Ok(TimelineLabel::PurgeBoundary),
1835        "capture-failed" => Ok(TimelineLabel::CaptureFailed),
1836        other => Err(Status::invalid_argument(format!(
1837            "unknown timeline label '{other}'"
1838        ))),
1839    }
1840}
1841
1842fn label_to_wire(label: &TimelineLabel) -> String {
1843    match label {
1844        TimelineLabel::RepoReversible => "repo-reversible",
1845        TimelineLabel::ExternalSideEffectsUnknown => "external-side-effects-unknown",
1846        TimelineLabel::IgnoredPathTouched => "ignored-path-touched",
1847        TimelineLabel::OutsideRepoTouched => "outside-repo-touched",
1848        TimelineLabel::PurgeBoundary => "purge-boundary",
1849        TimelineLabel::CaptureFailed => "capture-failed",
1850    }
1851    .to_string()
1852}
1853
1854fn parse_tool_call_status(value: &str) -> Result<TimelineToolCallStatus, Status> {
1855    match value {
1856        "succeeded" => Ok(TimelineToolCallStatus::Succeeded),
1857        "failed" => Ok(TimelineToolCallStatus::Failed),
1858        "cancelled" | "canceled" => Ok(TimelineToolCallStatus::Cancelled),
1859        other => Err(Status::invalid_argument(format!(
1860            "unknown timeline tool call status '{other}'"
1861        ))),
1862    }
1863}
1864
1865fn tool_call_status_to_wire(status: &TimelineToolCallStatus) -> &'static str {
1866    match status {
1867        TimelineToolCallStatus::Succeeded => "succeeded",
1868        TimelineToolCallStatus::Failed => "failed",
1869        TimelineToolCallStatus::Cancelled => "cancelled",
1870    }
1871}
1872
1873fn parse_cursor_reason(value: &str) -> Result<TimelineCursorMoveReason, Status> {
1874    match value {
1875        "seek-tool-call" => Ok(TimelineCursorMoveReason::SeekToolCall),
1876        "undo" => Ok(TimelineCursorMoveReason::Undo),
1877        "redo" => Ok(TimelineCursorMoveReason::Redo),
1878        "reset" => Ok(TimelineCursorMoveReason::Reset),
1879        "auto-advance" => Ok(TimelineCursorMoveReason::AutoAdvance),
1880        other => Err(Status::invalid_argument(format!(
1881            "unknown timeline cursor move reason '{other}'"
1882        ))),
1883    }
1884}
1885
1886fn cursor_reason_to_wire(reason: &TimelineCursorMoveReason) -> &'static str {
1887    match reason {
1888        TimelineCursorMoveReason::SeekToolCall => "seek-tool-call",
1889        TimelineCursorMoveReason::Undo => "undo",
1890        TimelineCursorMoveReason::Redo => "redo",
1891        TimelineCursorMoveReason::Reset => "reset",
1892        TimelineCursorMoveReason::AutoAdvance => "auto-advance",
1893    }
1894}
1895
1896fn parse_branch_reason(value: &str) -> Result<TimelineBranchReason, Status> {
1897    match value {
1898        "edit-from-rewound-cursor" => Ok(TimelineBranchReason::EditFromRewoundCursor),
1899        "explicit-fork" => Ok(TimelineBranchReason::ExplicitFork),
1900        "retry" => Ok(TimelineBranchReason::Retry),
1901        "fan-out" => Ok(TimelineBranchReason::FanOut),
1902        other => Err(Status::invalid_argument(format!(
1903            "unknown timeline branch reason '{other}'"
1904        ))),
1905    }
1906}
1907
1908fn branch_reason_to_wire(reason: &TimelineBranchReason) -> &'static str {
1909    match reason {
1910        TimelineBranchReason::EditFromRewoundCursor => "edit-from-rewound-cursor",
1911        TimelineBranchReason::ExplicitFork => "explicit-fork",
1912        TimelineBranchReason::Retry => "retry",
1913        TimelineBranchReason::FanOut => "fan-out",
1914    }
1915}
1916
1917#[cfg(test)]
1918mod tests {
1919    use std::{fs, sync::Arc};
1920
1921    use grpc::heddle::v1::{
1922        AgentTimelineNativeToolCall, AgentTimelineOperationDraft, AgentTimelineToolCallFinished,
1923        AgentTimelineToolCallStarted, ForkTimelineFromSelectorRequest,
1924        GetTimelineNavigationRequest, GetTimelineOperationRequest, GetTimelineStatusRequest,
1925        ListTimelineStepsRequest, MaterializeTimelineCursorRequest, PreviewTimelineSeekRequest,
1926        RecordTimelineOperationRequest, RecoverTimelineMaterializationRequest,
1927        ResetTimelineCursorRequest, ResolveNativeToolCallRequest,
1928        SeekTimelineToNativeToolCallRequest, SeekTimelineToStepRequest, TimelineCursorSelector,
1929        TimelineMaterializeMode, TimelineSeekNativeToolCallSelector, TimelineSeekSelector,
1930        TimelineSeekStepSelector, agent_timeline_operation_draft, timeline_seek_selector,
1931        timeline_service_server::TimelineService,
1932    };
1933    use repo::{Repository, operation_dedup::OperationDedupStore};
1934    use tempfile::TempDir;
1935    use tonic::Request;
1936
1937    use super::*;
1938
1939    fn fresh_service() -> (TempDir, LocalTimelineService) {
1940        let temp = TempDir::new().unwrap();
1941        let repo = Repository::init_default(temp.path()).unwrap();
1942        let dedup = OperationDedupStore::open(repo.heddle_dir()).unwrap();
1943        let inner = GrpcLocalService::new(Arc::new(repo), Arc::new(dedup));
1944        (temp, LocalTimelineService::new(inner))
1945    }
1946
1947    async fn record_finished_step(
1948        service: &LocalTimelineService,
1949        step_id: &str,
1950        tool_call_id: &str,
1951        before: u8,
1952        after: u8,
1953        finished_at_ms: i64,
1954    ) {
1955        service
1956            .record_operation(Request::new(RecordTimelineOperationRequest {
1957                repo_path: String::new(),
1958                operation: Some(AgentTimelineOperationDraft {
1959                    labels: vec!["repo-reversible".to_string()],
1960                    body: Some(agent_timeline_operation_draft::Body::ToolCallFinished(
1961                        AgentTimelineToolCallFinished {
1962                            thread: "main".to_string(),
1963                            step_id: step_id.to_string(),
1964                            branch_id: "tlb-main".to_string(),
1965                            native: Some(AgentTimelineNativeToolCall {
1966                                harness: "opencode".to_string(),
1967                                session_id: "session-1".to_string(),
1968                                message_id: "message-1".to_string(),
1969                                tool_call_id: tool_call_id.to_string(),
1970                            }),
1971                            status: "succeeded".to_string(),
1972                            before_state: vec![before; 16],
1973                            after_state: vec![after; 16],
1974                            capture_state: vec![after; 16],
1975                            capture_oplog_batch_id: Some(finished_at_ms as u64),
1976                            changed: true,
1977                            touched_paths: vec![format!("src/{step_id}.rs")],
1978                            payload: None,
1979                            finished_at_ms,
1980                        },
1981                    )),
1982                }),
1983                client_operation_id: String::new(),
1984            }))
1985            .await
1986            .unwrap();
1987    }
1988
1989    fn write_repo_state(
1990        service: &LocalTimelineService,
1991        root: &std::path::Path,
1992        path: &str,
1993        content: &str,
1994    ) -> ChangeId {
1995        fs::write(root.join(path), content).unwrap();
1996        service
1997            .inner
1998            .repo()
1999            .snapshot(Some(path.to_string()), None)
2000            .unwrap()
2001            .change_id
2002    }
2003
2004    async fn record_finished_step_for_states(
2005        service: &LocalTimelineService,
2006        step_id: &str,
2007        tool_call_id: &str,
2008        before: ChangeId,
2009        after: ChangeId,
2010        touched_path: &str,
2011        finished_at_ms: i64,
2012    ) {
2013        service
2014            .record_operation(Request::new(RecordTimelineOperationRequest {
2015                repo_path: String::new(),
2016                operation: Some(AgentTimelineOperationDraft {
2017                    labels: vec!["repo-reversible".to_string()],
2018                    body: Some(agent_timeline_operation_draft::Body::ToolCallFinished(
2019                        AgentTimelineToolCallFinished {
2020                            thread: "main".to_string(),
2021                            step_id: step_id.to_string(),
2022                            branch_id: "tlb-main".to_string(),
2023                            native: Some(AgentTimelineNativeToolCall {
2024                                harness: "opencode".to_string(),
2025                                session_id: "session-1".to_string(),
2026                                message_id: "message-1".to_string(),
2027                                tool_call_id: tool_call_id.to_string(),
2028                            }),
2029                            status: "succeeded".to_string(),
2030                            before_state: before.as_bytes().to_vec(),
2031                            after_state: after.as_bytes().to_vec(),
2032                            capture_state: after.as_bytes().to_vec(),
2033                            capture_oplog_batch_id: Some(finished_at_ms as u64),
2034                            changed: true,
2035                            touched_paths: vec![touched_path.to_string()],
2036                            payload: None,
2037                            finished_at_ms,
2038                        },
2039                    )),
2040                }),
2041                client_operation_id: String::new(),
2042            }))
2043            .await
2044            .unwrap();
2045    }
2046
2047    #[tokio::test]
2048    async fn record_operation_stores_canonical_timeline_object() {
2049        let (_temp, service) = fresh_service();
2050        let response = service
2051            .record_operation(Request::new(RecordTimelineOperationRequest {
2052                repo_path: String::new(),
2053                operation: Some(AgentTimelineOperationDraft {
2054                    labels: vec!["repo-reversible".to_string()],
2055                    body: Some(agent_timeline_operation_draft::Body::ToolCallStarted(
2056                        AgentTimelineToolCallStarted {
2057                            thread: "main".to_string(),
2058                            step_id: "tls-step".to_string(),
2059                            branch_id: "tlb-main".to_string(),
2060                            parent_step_id: String::new(),
2061                            native: Some(AgentTimelineNativeToolCall {
2062                                harness: "opencode".to_string(),
2063                                session_id: "session-1".to_string(),
2064                                message_id: "message-1".to_string(),
2065                                tool_call_id: "call-1".to_string(),
2066                            }),
2067                            tool_name: "shell".to_string(),
2068                            before_state: vec![1; 16],
2069                            payload: None,
2070                            started_at_ms: 1_700_000_000_000,
2071                        },
2072                    )),
2073                }),
2074                client_operation_id: String::new(),
2075            }))
2076            .await
2077            .unwrap()
2078            .into_inner();
2079
2080        assert_eq!(response.kind, "tool_call_started");
2081        assert_eq!(response.labels, vec!["repo-reversible"]);
2082        assert_eq!(response.schema_version, 1);
2083        assert_eq!(response.operation_id.len(), 32);
2084        assert!(!response.envelope.is_empty());
2085
2086        let read = service
2087            .get_operation(Request::new(GetTimelineOperationRequest {
2088                repo_path: String::new(),
2089                operation_id: response.operation_id.clone(),
2090            }))
2091            .await
2092            .unwrap()
2093            .into_inner();
2094
2095        assert_eq!(read.operation_id, response.operation_id);
2096        assert_eq!(read.envelope, response.envelope);
2097        assert_eq!(read.display_id, response.display_id);
2098    }
2099
2100    #[tokio::test]
2101    async fn list_status_and_seek_follow_recorded_tool_calls() {
2102        let (_temp, service) = fresh_service();
2103        record_finished_step(&service, "tls-step-1", "call-1", 1, 2, 1_700_000_000_000).await;
2104        record_finished_step(&service, "tls-step-2", "call-2", 2, 3, 1_700_000_000_100).await;
2105
2106        let status = service
2107            .get_timeline_status(Request::new(GetTimelineStatusRequest {
2108                repo_path: String::new(),
2109                thread: "main".to_string(),
2110            }))
2111            .await
2112            .unwrap()
2113            .into_inner();
2114        assert_eq!(status.current_branch_id, "tlb-main");
2115        assert_eq!(status.current_step_id, "tls-step-2");
2116        assert_eq!(status.current_state.unwrap().state_id, vec![3; 16]);
2117        assert_eq!(status.step_count, 2);
2118
2119        let listed = service
2120            .list_timeline_steps(Request::new(ListTimelineStepsRequest {
2121                repo_path: String::new(),
2122                thread: "main".to_string(),
2123                branch_id: "tlb-main".to_string(),
2124                limit: 0,
2125            }))
2126            .await
2127            .unwrap()
2128            .into_inner();
2129        assert_eq!(listed.steps.len(), 2);
2130        assert_eq!(listed.steps[0].step_id, "tls-step-1");
2131        assert_eq!(listed.steps[1].step_id, "tls-step-2");
2132        assert_eq!(listed.steps[1].status, "succeeded");
2133        assert_eq!(listed.steps[1].touched_paths, vec!["src/tls-step-2.rs"]);
2134
2135        let resolved = service
2136            .resolve_native_tool_call(Request::new(ResolveNativeToolCallRequest {
2137                repo_path: String::new(),
2138                thread: "main".to_string(),
2139                harness: "opencode".to_string(),
2140                session_id: "session-1".to_string(),
2141                message_id: "message-1".to_string(),
2142                tool_call_id: "call-1".to_string(),
2143            }))
2144            .await
2145            .unwrap()
2146            .into_inner();
2147        assert_eq!(resolved.step_id, "tls-step-1");
2148
2149        let moved = service
2150            .seek_to_step(Request::new(SeekTimelineToStepRequest {
2151                repo_path: String::new(),
2152                thread: "main".to_string(),
2153                branch_id: "tlb-main".to_string(),
2154                step_id: "tls-step-1".to_string(),
2155                reason: "seek-step".to_string(),
2156                client_operation_id: String::new(),
2157            }))
2158            .await
2159            .unwrap()
2160            .into_inner();
2161        assert_eq!(moved.operation.unwrap().kind, "cursor_moved");
2162        let moved_status = moved.status.unwrap();
2163        assert_eq!(moved_status.current_step_id, "tls-step-1");
2164        assert_eq!(moved_status.current_state.unwrap().state_id, vec![2; 16]);
2165
2166        let moved = service
2167            .seek_to_native_tool_call(Request::new(SeekTimelineToNativeToolCallRequest {
2168                repo_path: String::new(),
2169                thread: "main".to_string(),
2170                harness: "opencode".to_string(),
2171                session_id: "session-1".to_string(),
2172                message_id: "message-1".to_string(),
2173                tool_call_id: "call-2".to_string(),
2174                reason: String::new(),
2175                client_operation_id: String::new(),
2176            }))
2177            .await
2178            .unwrap()
2179            .into_inner();
2180        let moved_status = moved.status.unwrap();
2181        assert_eq!(moved_status.current_step_id, "tls-step-2");
2182        assert_eq!(moved_status.current_state.unwrap().state_id, vec![3; 16]);
2183    }
2184
2185    #[tokio::test]
2186    async fn get_timeline_navigation_returns_cursor_actions_and_native_ids() {
2187        let (_temp, service) = fresh_service();
2188        record_finished_step(&service, "tls-step-1", "call-1", 1, 2, 1_700_000_000_000).await;
2189        record_finished_step(&service, "tls-step-2", "call-2", 2, 3, 1_700_000_000_100).await;
2190
2191        let navigation = service
2192            .get_timeline_navigation(Request::new(GetTimelineNavigationRequest {
2193                repo_path: String::new(),
2194                thread: "main".to_string(),
2195            }))
2196            .await
2197            .unwrap()
2198            .into_inner();
2199
2200        assert_eq!(navigation.thread, "main");
2201        assert_eq!(navigation.cursor.unwrap().step_id, "tls-step-2");
2202        assert_eq!(navigation.branches.len(), 1);
2203        assert_eq!(navigation.active_branch_path, vec!["tlb-main"]);
2204        assert!(navigation.actions.as_ref().unwrap().can_undo);
2205        assert!(!navigation.actions.as_ref().unwrap().can_redo);
2206        assert!(navigation.recovery.is_none());
2207
2208        let current = navigation
2209            .steps
2210            .iter()
2211            .find(|step| step.is_current)
2212            .expect("current step");
2213        let summary = current.step.as_ref().unwrap();
2214        assert_eq!(summary.step_id, "tls-step-2");
2215        assert_eq!(summary.native.as_ref().unwrap().harness, "opencode");
2216        assert_eq!(summary.native.as_ref().unwrap().tool_call_id, "call-2");
2217        assert_eq!(current.cursor_state, vec![3; 16]);
2218    }
2219
2220    #[tokio::test]
2221    async fn preview_timeline_seek_resolves_step_and_native_selectors() {
2222        let (temp, service) = fresh_service();
2223        let state0 = service.inner.repo().head().unwrap().unwrap();
2224        let state1 = write_repo_state(&service, temp.path(), "tracked.txt", "one\n");
2225        let state2 = write_repo_state(&service, temp.path(), "tracked.txt", "two\n");
2226        record_finished_step_for_states(
2227            &service,
2228            "tls-step-1",
2229            "call-1",
2230            state0,
2231            state1,
2232            "tracked.txt",
2233            1_700_000_000_000,
2234        )
2235        .await;
2236        record_finished_step_for_states(
2237            &service,
2238            "tls-step-2",
2239            "call-2",
2240            state1,
2241            state2,
2242            "tracked.txt",
2243            1_700_000_000_100,
2244        )
2245        .await;
2246
2247        let by_step = service
2248            .preview_timeline_seek(Request::new(PreviewTimelineSeekRequest {
2249                repo_path: String::new(),
2250                selector: Some(TimelineSeekSelector {
2251                    target: Some(timeline_seek_selector::Target::Step(
2252                        TimelineSeekStepSelector {
2253                            thread: "main".to_string(),
2254                            branch_id: "tlb-main".to_string(),
2255                            step_id: "tls-step-1".to_string(),
2256                        },
2257                    )),
2258                }),
2259                mode: TimelineMaterializeMode::FailIfDirty as i32,
2260            }))
2261            .await
2262            .unwrap()
2263            .into_inner();
2264
2265        assert_eq!(
2266            by_step.current_status.unwrap().current_step_id,
2267            "tls-step-2"
2268        );
2269        assert_eq!(by_step.current_state, state2.as_bytes().to_vec());
2270        assert_eq!(by_step.checkout_state, state2.as_bytes().to_vec());
2271        assert_eq!(by_step.target_branch_id, "tlb-main");
2272        assert_eq!(by_step.target_step_id, "tls-step-1");
2273        assert_eq!(by_step.target_state, state1.as_bytes().to_vec());
2274        assert_eq!(by_step.changed_paths, vec!["tracked.txt"]);
2275        assert!(by_step.worktree_dirty_known);
2276        assert!(!by_step.worktree_dirty);
2277        assert!(by_step.materialization_supported);
2278        assert!(by_step.can_materialize);
2279        assert!(by_step.blockers.is_empty());
2280
2281        let by_native = service
2282            .preview_timeline_seek(Request::new(PreviewTimelineSeekRequest {
2283                repo_path: String::new(),
2284                selector: Some(TimelineSeekSelector {
2285                    target: Some(timeline_seek_selector::Target::NativeToolCall(
2286                        TimelineSeekNativeToolCallSelector {
2287                            thread: "main".to_string(),
2288                            harness: "opencode".to_string(),
2289                            session_id: "session-1".to_string(),
2290                            message_id: "message-1".to_string(),
2291                            tool_call_id: "call-2".to_string(),
2292                        },
2293                    )),
2294                }),
2295                mode: TimelineMaterializeMode::FailIfDirty as i32,
2296            }))
2297            .await
2298            .unwrap()
2299            .into_inner();
2300
2301        assert_eq!(by_native.target_step_id, "tls-step-2");
2302        assert_eq!(by_native.target_state, state2.as_bytes().to_vec());
2303
2304        let current_cursor = service
2305            .preview_timeline_seek(Request::new(PreviewTimelineSeekRequest {
2306                repo_path: String::new(),
2307                selector: Some(TimelineSeekSelector {
2308                    target: Some(timeline_seek_selector::Target::CurrentCursor(
2309                        TimelineCursorSelector {
2310                            thread: "main".to_string(),
2311                            branch_id: "tlb-main".to_string(),
2312                        },
2313                    )),
2314                }),
2315                mode: TimelineMaterializeMode::FailIfDirty as i32,
2316            }))
2317            .await
2318            .unwrap()
2319            .into_inner();
2320
2321        assert_eq!(current_cursor.target_step_id, "tls-step-2");
2322        assert_eq!(current_cursor.target_state, state2.as_bytes().to_vec());
2323        assert!(current_cursor.can_materialize);
2324
2325        let unsupported = service
2326            .preview_timeline_seek(Request::new(PreviewTimelineSeekRequest {
2327                repo_path: String::new(),
2328                selector: Some(TimelineSeekSelector {
2329                    target: Some(timeline_seek_selector::Target::CurrentCursor(
2330                        TimelineCursorSelector {
2331                            thread: "main".to_string(),
2332                            branch_id: "tlb-main".to_string(),
2333                        },
2334                    )),
2335                }),
2336                mode: TimelineMaterializeMode::CaptureCurrentThenSeek as i32,
2337            }))
2338            .await
2339            .unwrap()
2340            .into_inner();
2341
2342        assert!(!unsupported.can_materialize);
2343        assert_eq!(unsupported.blockers.len(), 1);
2344        assert_eq!(
2345            unsupported.blockers[0].kind,
2346            TimelineMaterializationBlockerKind::UnsupportedMode as i32
2347        );
2348        assert_eq!(
2349            unsupported.blockers[0].unsupported_mode,
2350            TimelineMaterializeMode::CaptureCurrentThenSeek as i32
2351        );
2352    }
2353
2354    #[tokio::test]
2355    async fn materialize_timeline_cursor_moves_checkout_and_records_cursor() {
2356        let (temp, service) = fresh_service();
2357        let state0 = service.inner.repo().head().unwrap().unwrap();
2358        let state1 = write_repo_state(&service, temp.path(), "tracked.txt", "one\n");
2359        let state2 = write_repo_state(&service, temp.path(), "tracked.txt", "two\n");
2360        record_finished_step_for_states(
2361            &service,
2362            "tls-step-1",
2363            "call-1",
2364            state0,
2365            state1,
2366            "tracked.txt",
2367            1_700_000_000_000,
2368        )
2369        .await;
2370        record_finished_step_for_states(
2371            &service,
2372            "tls-step-2",
2373            "call-2",
2374            state1,
2375            state2,
2376            "tracked.txt",
2377            1_700_000_000_100,
2378        )
2379        .await;
2380
2381        let response = service
2382            .materialize_timeline_cursor(Request::new(MaterializeTimelineCursorRequest {
2383                repo_path: String::new(),
2384                selector: Some(TimelineSeekSelector {
2385                    target: Some(timeline_seek_selector::Target::Step(
2386                        TimelineSeekStepSelector {
2387                            thread: "main".to_string(),
2388                            branch_id: "tlb-main".to_string(),
2389                            step_id: "tls-step-1".to_string(),
2390                        },
2391                    )),
2392                }),
2393                mode: TimelineMaterializeMode::FailIfDirty as i32,
2394                client_operation_id: String::new(),
2395            }))
2396            .await
2397            .unwrap()
2398            .into_inner();
2399
2400        assert!(response.materialized);
2401        assert_eq!(
2402            response.status,
2403            WireTimelineMaterializeStatus::Materialized as i32
2404        );
2405        assert_eq!(
2406            response.recovery_status,
2407            WireTimelineMaterializationRecoveryStatus::NoPending as i32
2408        );
2409        assert!(response.recovered_cursor_operation.is_none());
2410        assert!(response.recovery_blockers.is_empty());
2411        assert_eq!(response.cursor_operation.unwrap().kind, "cursor_moved");
2412        assert_eq!(
2413            response.preview.as_ref().unwrap().target_step_id,
2414            "tls-step-1"
2415        );
2416        assert_eq!(
2417            response.updated_status.unwrap().current_step_id,
2418            "tls-step-1",
2419            "materialization should update the logical cursor after moving the checkout"
2420        );
2421        assert!(response.blockers.is_empty());
2422        assert_eq!(service.inner.repo().head().unwrap(), Some(state1));
2423        assert_eq!(
2424            fs::read_to_string(temp.path().join("tracked.txt")).unwrap(),
2425            "one\n"
2426        );
2427    }
2428
2429    #[tokio::test]
2430    async fn timeline_action_rpcs_return_navigation_and_records() {
2431        let (_temp, service) = fresh_service();
2432        record_finished_step(&service, "tls-step-1", "call-1", 1, 2, 1_700_000_000_000).await;
2433        record_finished_step(&service, "tls-step-2", "call-2", 2, 3, 1_700_000_000_100).await;
2434
2435        let fork = service
2436            .fork_timeline_from_selector(Request::new(ForkTimelineFromSelectorRequest {
2437                repo_path: String::new(),
2438                selector: Some(TimelineSeekSelector {
2439                    target: Some(timeline_seek_selector::Target::NativeToolCall(
2440                        TimelineSeekNativeToolCallSelector {
2441                            thread: "main".to_string(),
2442                            harness: "opencode".to_string(),
2443                            session_id: "session-1".to_string(),
2444                            message_id: "message-1".to_string(),
2445                            tool_call_id: "call-1".to_string(),
2446                        },
2447                    )),
2448                }),
2449                branch_id: "tlb-child".to_string(),
2450                reason: "fan-out".to_string(),
2451                client_operation_id: String::new(),
2452            }))
2453            .await
2454            .unwrap()
2455            .into_inner();
2456
2457        assert_eq!(fork.branch_id, "tlb-child");
2458        assert_eq!(fork.parent_branch_id, "tlb-main");
2459        assert_eq!(fork.from_step_id, "tls-step-1");
2460        assert_eq!(fork.operation.unwrap().kind, "branch_created");
2461        assert!(
2462            fork.navigation
2463                .as_ref()
2464                .unwrap()
2465                .branches
2466                .iter()
2467                .any(|branch| branch.branch_id == "tlb-child")
2468        );
2469
2470        let reset = service
2471            .reset_timeline_cursor(Request::new(ResetTimelineCursorRequest {
2472                repo_path: String::new(),
2473                selector: Some(TimelineSeekSelector {
2474                    target: Some(timeline_seek_selector::Target::Step(
2475                        TimelineSeekStepSelector {
2476                            thread: "main".to_string(),
2477                            branch_id: "tlb-main".to_string(),
2478                            step_id: "tls-step-1".to_string(),
2479                        },
2480                    )),
2481                }),
2482                mode: TimelineMaterializeMode::FailIfDirty as i32,
2483                materialize_checkout: false,
2484                client_operation_id: String::new(),
2485            }))
2486            .await
2487            .unwrap()
2488            .into_inner();
2489
2490        assert_eq!(reset.cursor_operation.unwrap().kind, "cursor_moved");
2491        assert!(reset.materialization.is_none());
2492        assert_eq!(
2493            reset.navigation.unwrap().cursor.unwrap().step_id,
2494            "tls-step-1"
2495        );
2496
2497        let recovery = service
2498            .recover_timeline_materialization(Request::new(RecoverTimelineMaterializationRequest {
2499                repo_path: String::new(),
2500                thread: "main".to_string(),
2501                client_operation_id: String::new(),
2502            }))
2503            .await
2504            .unwrap()
2505            .into_inner();
2506
2507        assert_eq!(
2508            recovery.recovery_status,
2509            WireTimelineMaterializationRecoveryStatus::NoPending as i32
2510        );
2511        assert!(recovery.recovered_cursor_operation.is_none());
2512        assert!(recovery.recovery_blockers.is_empty());
2513    }
2514}