1use std::time::{SystemTime, UNIX_EPOCH};
5
6use grpc::heddle::v1::{
7 AgentTimelineBranchCreated, AgentTimelineCursorMoved, AgentTimelineNativeToolCall,
8 AgentTimelineOperationDraft, AgentTimelineOperationRecord, AgentTimelineStateSummary,
9 AgentTimelineStatus, AgentTimelineStepSummary, AgentTimelineToolCallFinished,
10 AgentTimelineToolCallStarted, AgentTimelineToolPayload, CreateTimelineBranchRequest,
11 CreateTimelineBranchResponse, ForkTimelineFromSelectorRequest,
12 ForkTimelineFromSelectorResponse, GetTimelineNavigationRequest, GetTimelineOperationRequest,
13 GetTimelineStatusRequest, ListTimelineStepsRequest, ListTimelineStepsResponse,
14 MaterializeTimelineCursorRequest, PreviewTimelineSeekRequest, RecordTimelineOperationRequest,
15 RecoverTimelineMaterializationRequest, RecoverTimelineMaterializationResponse,
16 ResetTimelineCursorRequest, ResetTimelineCursorResponse, ResolveNativeToolCallRequest,
17 SeekTimelineToNativeToolCallRequest, SeekTimelineToStepRequest, TimelineCursorMoveResponse,
18 TimelineCursorRequest, TimelineCursorSelector,
19 TimelineMaterializationBlocker as WireTimelineMaterializationBlocker,
20 TimelineMaterializationBlockerKind,
21 TimelineMaterializationRecoveryBlocker as WireTimelineMaterializationRecoveryBlocker,
22 TimelineMaterializationRecoveryBlockerKind,
23 TimelineMaterializationRecoveryStatus as WireTimelineMaterializationRecoveryStatus,
24 TimelineMaterializeMode, TimelineMaterializeResponse,
25 TimelineMaterializeStatus as WireTimelineMaterializeStatus,
26 TimelineNavigationActionAvailability as WireTimelineNavigationActionAvailability,
27 TimelineNavigationBranch as WireTimelineNavigationBranch,
28 TimelineNavigationCursor as WireTimelineNavigationCursor,
29 TimelineNavigationRecovery as WireTimelineNavigationRecovery,
30 TimelineNavigationRecoveryStatus as WireTimelineNavigationRecoveryStatus,
31 TimelineNavigationSnapshot as WireTimelineNavigationSnapshot,
32 TimelineNavigationStep as WireTimelineNavigationStep, TimelineSeekNativeToolCallSelector,
33 TimelineSeekPreview, TimelineSeekSelector, TimelineSeekStepSelector,
34 agent_timeline_operation_draft, agent_timeline_operation_record, timeline_seek_selector,
35 timeline_service_server::TimelineService,
36};
37use objects::object::{
38 BranchCreatedV1, ChangeId, ContentHash, CursorMovedV1, NativeToolCallRefV1, TimelineBranchId,
39 TimelineBranchReason, TimelineCursorMoveReason, TimelineLabel, TimelineOperationBodyV1,
40 TimelineOperationEnvelope, TimelineOperationId, TimelineStepId, TimelineToolCallStatus,
41 TimelineToolPayloadMetadata, ToolCallFinishedV1, ToolCallStartedV1,
42};
43use prost::Message;
44use repo::{
45 TimelineCursorMoveRecord, TimelineMaterializationBlocker,
46 TimelineMaterializationRecoveryBlocker, TimelineMaterializationRecoveryOutcome,
47 TimelineMaterializationRecoveryStatus, TimelineMaterializeMode as RepoTimelineMaterializeMode,
48 TimelineMaterializeOutcome, TimelineMaterializeStatus, TimelineNativeToolKey,
49 TimelineNavigationRecoveryStatus as RepoTimelineNavigationRecoveryStatus,
50 TimelineNavigationSnapshot as RepoTimelineNavigationSnapshot,
51 TimelineNavigationStep as RepoTimelineNavigationStep, TimelineSeekBranchConstraint,
52 TimelineSeekPreview as RepoTimelineSeekPreview,
53 TimelineSeekSelector as RepoTimelineSeekSelector, TimelineSeekTarget, TimelineStepSummary,
54 TimelineStore, TimelineThreadStatus, TimelineView,
55};
56use tonic::{Request, Response, Status};
57
58use super::{GrpcLocalService, to_status, with_idempotency};
59
60#[derive(Clone)]
61pub struct LocalTimelineService {
62 inner: GrpcLocalService,
63}
64
65impl LocalTimelineService {
66 pub fn new(inner: GrpcLocalService) -> Self {
67 Self { inner }
68 }
69}
70
71#[tonic::async_trait]
72impl TimelineService for LocalTimelineService {
73 async fn record_operation(
74 &self,
75 request: Request<RecordTimelineOperationRequest>,
76 ) -> Result<Response<AgentTimelineOperationRecord>, Status> {
77 let req = request.into_inner();
78 let body = req.encode_to_vec();
79 let client_op = req.client_operation_id.clone();
80 let inner = self.inner.clone();
81
82 let response = with_idempotency(
83 &self.inner,
84 &client_op,
85 "TimelineService.RecordOperation",
86 &body,
87 move || async move {
88 let draft = req
89 .operation
90 .ok_or_else(|| Status::invalid_argument("operation is required"))?;
91 let envelope = draft_to_envelope(draft)?;
92 let bytes = envelope
93 .encode()
94 .map_err(|err| Status::invalid_argument(err.to_string()))?;
95 let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
96 let id = store.write_operation_bytes(&bytes).map_err(to_status)?;
97 record_from_envelope(id, envelope, bytes)
98 },
99 )
100 .await?;
101
102 Ok(Response::new(response))
103 }
104
105 async fn get_operation(
106 &self,
107 request: Request<GetTimelineOperationRequest>,
108 ) -> Result<Response<AgentTimelineOperationRecord>, Status> {
109 let req = request.into_inner();
110 let id = TimelineOperationId::try_from_slice(&req.operation_id)
111 .map_err(|err| Status::invalid_argument(format!("invalid operation_id: {err}")))?;
112 let store = TimelineStore::open(self.inner.repo().heddle_dir()).map_err(to_status)?;
113 let bytes = store
114 .read_operation_bytes(&id)
115 .map_err(to_status)?
116 .ok_or_else(|| Status::not_found(format!("timeline operation {}", id.short())))?;
117 let envelope = TimelineOperationEnvelope::decode(&bytes)
118 .map_err(|err| Status::internal(format!("decode stored timeline operation: {err}")))?;
119 Ok(Response::new(record_from_envelope(id, envelope, bytes)?))
120 }
121
122 async fn get_timeline_status(
123 &self,
124 request: Request<GetTimelineStatusRequest>,
125 ) -> Result<Response<AgentTimelineStatus>, Status> {
126 let req = request.into_inner();
127 let (_store, view) = open_timeline_store_and_view(&self.inner)?;
128 Ok(Response::new(status_for_thread(&view, &req.thread)))
129 }
130
131 async fn get_timeline_navigation(
132 &self,
133 request: Request<GetTimelineNavigationRequest>,
134 ) -> Result<Response<WireTimelineNavigationSnapshot>, Status> {
135 let req = request.into_inner();
136 let snapshot = get_timeline_navigation_impl(&self.inner, req)?;
137 Ok(Response::new(snapshot))
138 }
139
140 async fn list_timeline_steps(
141 &self,
142 request: Request<ListTimelineStepsRequest>,
143 ) -> Result<Response<ListTimelineStepsResponse>, Status> {
144 let req = request.into_inner();
145 let (_store, view) = open_timeline_store_and_view(&self.inner)?;
146 let branch_id = if req.branch_id.is_empty() {
147 view.status(&req.thread)
148 .and_then(|status| status.current_branch_id.clone())
149 } else {
150 Some(TimelineBranchId::new(req.branch_id.clone()))
151 };
152 let mut steps = branch_id
153 .as_ref()
154 .map(|branch_id| {
155 view.list_branch_steps(&req.thread, branch_id)
156 .into_iter()
157 .map(step_summary_to_proto)
158 .collect::<Vec<_>>()
159 })
160 .unwrap_or_default();
161 if req.limit > 0 && steps.len() > req.limit as usize {
162 let keep_from = steps.len() - req.limit as usize;
163 steps = steps.split_off(keep_from);
164 }
165 Ok(Response::new(ListTimelineStepsResponse {
166 steps,
167 status: Some(status_for_thread(&view, &req.thread)),
168 }))
169 }
170
171 async fn resolve_native_tool_call(
172 &self,
173 request: Request<ResolveNativeToolCallRequest>,
174 ) -> Result<Response<AgentTimelineStepSummary>, Status> {
175 let req = request.into_inner();
176 let (_store, view) = open_timeline_store_and_view(&self.inner)?;
177 let native = native_key_from_resolve_request(&req);
178 let step = view
179 .find_step_by_native_call(&req.thread, &native)
180 .ok_or_else(|| Status::not_found("native tool call not found"))?;
181 Ok(Response::new(step_summary_to_proto(step)))
182 }
183
184 async fn seek_to_step(
185 &self,
186 request: Request<SeekTimelineToStepRequest>,
187 ) -> Result<Response<TimelineCursorMoveResponse>, Status> {
188 let req = request.into_inner();
189 let body = req.encode_to_vec();
190 let client_op = req.client_operation_id.clone();
191 let inner = self.inner.clone();
192
193 let response = with_idempotency(
194 &self.inner,
195 &client_op,
196 "TimelineService.SeekToStep",
197 &body,
198 move || async move { seek_to_step_impl(&inner, req).await },
199 )
200 .await?;
201
202 Ok(Response::new(response))
203 }
204
205 async fn seek_to_native_tool_call(
206 &self,
207 request: Request<SeekTimelineToNativeToolCallRequest>,
208 ) -> Result<Response<TimelineCursorMoveResponse>, Status> {
209 let req = request.into_inner();
210 let body = req.encode_to_vec();
211 let client_op = req.client_operation_id.clone();
212 let inner = self.inner.clone();
213
214 let response = with_idempotency(
215 &self.inner,
216 &client_op,
217 "TimelineService.SeekToNativeToolCall",
218 &body,
219 move || async move { seek_to_native_tool_call_impl(&inner, req).await },
220 )
221 .await?;
222
223 Ok(Response::new(response))
224 }
225
226 async fn undo_tool_call(
227 &self,
228 request: Request<TimelineCursorRequest>,
229 ) -> Result<Response<TimelineCursorMoveResponse>, Status> {
230 let req = request.into_inner();
231 let body = req.encode_to_vec();
232 let client_op = req.client_operation_id.clone();
233 let inner = self.inner.clone();
234
235 let response = with_idempotency(
236 &self.inner,
237 &client_op,
238 "TimelineService.UndoToolCall",
239 &body,
240 move || async move { move_cursor_by_delta_impl(&inner, req, -1).await },
241 )
242 .await?;
243
244 Ok(Response::new(response))
245 }
246
247 async fn redo_tool_call(
248 &self,
249 request: Request<TimelineCursorRequest>,
250 ) -> Result<Response<TimelineCursorMoveResponse>, Status> {
251 let req = request.into_inner();
252 let body = req.encode_to_vec();
253 let client_op = req.client_operation_id.clone();
254 let inner = self.inner.clone();
255
256 let response = with_idempotency(
257 &self.inner,
258 &client_op,
259 "TimelineService.RedoToolCall",
260 &body,
261 move || async move { move_cursor_by_delta_impl(&inner, req, 1).await },
262 )
263 .await?;
264
265 Ok(Response::new(response))
266 }
267
268 async fn create_timeline_branch(
269 &self,
270 request: Request<CreateTimelineBranchRequest>,
271 ) -> Result<Response<CreateTimelineBranchResponse>, Status> {
272 let req = request.into_inner();
273 let body = req.encode_to_vec();
274 let client_op = req.client_operation_id.clone();
275 let inner = self.inner.clone();
276
277 let response = with_idempotency(
278 &self.inner,
279 &client_op,
280 "TimelineService.CreateTimelineBranch",
281 &body,
282 move || async move { create_timeline_branch_impl(&inner, req).await },
283 )
284 .await?;
285
286 Ok(Response::new(response))
287 }
288
289 async fn preview_timeline_seek(
290 &self,
291 request: Request<PreviewTimelineSeekRequest>,
292 ) -> Result<Response<TimelineSeekPreview>, Status> {
293 let req = request.into_inner();
294 let preview = preview_timeline_seek_impl(&self.inner, req)?;
295 Ok(Response::new(preview))
296 }
297
298 async fn materialize_timeline_cursor(
299 &self,
300 request: Request<MaterializeTimelineCursorRequest>,
301 ) -> Result<Response<TimelineMaterializeResponse>, Status> {
302 let req = request.into_inner();
303 let body = req.encode_to_vec();
304 let client_op = req.client_operation_id.clone();
305 let inner = self.inner.clone();
306
307 let response = with_idempotency(
308 &self.inner,
309 &client_op,
310 "TimelineService.MaterializeTimelineCursor",
311 &body,
312 move || async move { materialize_timeline_cursor_impl(&inner, req).await },
313 )
314 .await?;
315
316 Ok(Response::new(response))
317 }
318
319 async fn fork_timeline_from_selector(
320 &self,
321 request: Request<ForkTimelineFromSelectorRequest>,
322 ) -> Result<Response<ForkTimelineFromSelectorResponse>, Status> {
323 let req = request.into_inner();
324 let body = req.encode_to_vec();
325 let client_op = req.client_operation_id.clone();
326 let inner = self.inner.clone();
327
328 let response = with_idempotency(
329 &self.inner,
330 &client_op,
331 "TimelineService.ForkTimelineFromSelector",
332 &body,
333 move || async move { fork_timeline_from_selector_impl(&inner, req).await },
334 )
335 .await?;
336
337 Ok(Response::new(response))
338 }
339
340 async fn reset_timeline_cursor(
341 &self,
342 request: Request<ResetTimelineCursorRequest>,
343 ) -> Result<Response<ResetTimelineCursorResponse>, Status> {
344 let req = request.into_inner();
345 let body = req.encode_to_vec();
346 let client_op = req.client_operation_id.clone();
347 let inner = self.inner.clone();
348
349 let response = with_idempotency(
350 &self.inner,
351 &client_op,
352 "TimelineService.ResetTimelineCursor",
353 &body,
354 move || async move { reset_timeline_cursor_impl(&inner, req).await },
355 )
356 .await?;
357
358 Ok(Response::new(response))
359 }
360
361 async fn recover_timeline_materialization(
362 &self,
363 request: Request<RecoverTimelineMaterializationRequest>,
364 ) -> Result<Response<RecoverTimelineMaterializationResponse>, Status> {
365 let req = request.into_inner();
366 let body = req.encode_to_vec();
367 let client_op = req.client_operation_id.clone();
368 let inner = self.inner.clone();
369
370 let response = with_idempotency(
371 &self.inner,
372 &client_op,
373 "TimelineService.RecoverTimelineMaterialization",
374 &body,
375 move || async move { recover_timeline_materialization_impl(&inner, req).await },
376 )
377 .await?;
378
379 Ok(Response::new(response))
380 }
381}
382
383async fn seek_to_step_impl(
384 inner: &GrpcLocalService,
385 req: SeekTimelineToStepRequest,
386) -> Result<TimelineCursorMoveResponse, Status> {
387 let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
388 let _record_guard = store.lock_recording(&req.thread).map_err(to_status)?;
389 let view = TimelineView::rebuild(&store).map_err(to_status)?;
390 let target = view
391 .resolve_seek_target(&req.thread, &TimelineStepId::new(req.step_id.clone()))
392 .ok_or_else(|| Status::not_found("timeline step not found"))?;
393 if !req.branch_id.is_empty() && target.branch_id.as_str() != req.branch_id {
394 return Err(Status::failed_precondition(
395 "timeline step belongs to a different branch",
396 ));
397 }
398 write_cursor_move(&store, &target, &view, parse_seek_reason(&req.reason)?)
399}
400
401async fn seek_to_native_tool_call_impl(
402 inner: &GrpcLocalService,
403 req: SeekTimelineToNativeToolCallRequest,
404) -> Result<TimelineCursorMoveResponse, Status> {
405 let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
406 let _record_guard = store.lock_recording(&req.thread).map_err(to_status)?;
407 let view = TimelineView::rebuild(&store).map_err(to_status)?;
408 let target = view
409 .resolve_seek_to_native_call(&req.thread, &native_key_from_parts(&req))
410 .ok_or_else(|| Status::not_found("native tool call not found"))?
411 .clone();
412 write_cursor_move(&store, &target, &view, parse_seek_reason(&req.reason)?)
413}
414
415async fn move_cursor_by_delta_impl(
416 inner: &GrpcLocalService,
417 req: TimelineCursorRequest,
418 delta: i32,
419) -> Result<TimelineCursorMoveResponse, Status> {
420 let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
421 let _record_guard = store.lock_recording(&req.thread).map_err(to_status)?;
422 let view = TimelineView::rebuild(&store).map_err(to_status)?;
423 if !req.branch_id.is_empty()
424 && view
425 .status(&req.thread)
426 .and_then(|status| status.current_branch_id.as_ref())
427 .is_some_and(|branch_id| branch_id.as_str() != req.branch_id)
428 {
429 return Err(Status::failed_precondition(
430 "timeline cursor is on a different branch",
431 ));
432 }
433 let target = if delta < 0 {
434 view.resolve_undo_target(&req.thread)
435 } else {
436 view.resolve_redo_target(&req.thread)
437 }
438 .ok_or_else(|| Status::failed_precondition("timeline cursor is not initialized"))?;
439 write_cursor_move(
440 &store,
441 &target,
442 &view,
443 if delta < 0 {
444 TimelineCursorMoveReason::Undo
445 } else {
446 TimelineCursorMoveReason::Redo
447 },
448 )
449}
450
451async fn create_timeline_branch_impl(
452 inner: &GrpcLocalService,
453 req: CreateTimelineBranchRequest,
454) -> Result<CreateTimelineBranchResponse, Status> {
455 let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
456 let _record_guard = store.lock_recording(&req.thread).map_err(to_status)?;
457 let view = TimelineView::rebuild(&store).map_err(to_status)?;
458 let target = if req.from_step_id.is_empty() {
459 let status = view.status(&req.thread).ok_or_else(|| {
460 Status::failed_precondition("from_step_id is required when the cursor has no step")
461 })?;
462 let branch_id = status.current_branch_id.clone().ok_or_else(|| {
463 Status::failed_precondition("from_step_id is required when the cursor has no branch")
464 })?;
465 let state = status.current_state.ok_or_else(|| {
466 Status::failed_precondition("from_step_id is required when the cursor has no state")
467 })?;
468 TimelineSeekTarget {
469 thread: req.thread.clone(),
470 branch_id,
471 step_id: status.current_step_id.clone(),
472 state,
473 }
474 } else {
475 view.resolve_seek_target(&req.thread, &TimelineStepId::new(req.from_step_id.clone()))
476 .ok_or_else(|| Status::not_found("timeline step not found"))?
477 };
478 let branch_id = if req.branch_id.is_empty() {
479 TimelineBranchId::generate()
480 } else {
481 TimelineBranchId::new(req.branch_id)
482 };
483 let body = BranchCreatedV1 {
484 thread: req.thread.clone(),
485 branch_id: branch_id.clone(),
486 parent_branch_id: Some(target.branch_id.clone()),
487 from_step_id: target.step_id.clone(),
488 from_state: target.state,
489 reason: parse_branch_reason_or_default(&req.reason)?,
490 created_at_ms: now_ms(),
491 };
492 let record = write_timeline_envelope(
493 &store,
494 TimelineOperationEnvelope::new(TimelineOperationBodyV1::BranchCreated(body), Vec::new()),
495 )?;
496 let view = TimelineView::rebuild(&store).map_err(to_status)?;
497 Ok(CreateTimelineBranchResponse {
498 status: Some(status_for_thread(&view, &req.thread)),
499 branch_id: branch_id.to_string(),
500 parent_branch_id: target.branch_id.to_string(),
501 from_step_id: target.step_id.map(|id| id.to_string()).unwrap_or_default(),
502 operation: Some(record),
503 })
504}
505
506fn get_timeline_navigation_impl(
507 inner: &GrpcLocalService,
508 req: GetTimelineNavigationRequest,
509) -> Result<WireTimelineNavigationSnapshot, Status> {
510 if req.thread.trim().is_empty() {
511 return Err(Status::invalid_argument("thread is required"));
512 }
513 let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
514 let snapshot = inner
515 .repo()
516 .timeline_navigation_snapshot(&store, &req.thread)
517 .map_err(to_status)?;
518 let view = TimelineView::rebuild(&store).map_err(to_status)?;
519 Ok(timeline_navigation_snapshot_to_proto(&view, snapshot))
520}
521
522fn preview_timeline_seek_impl(
523 inner: &GrpcLocalService,
524 req: PreviewTimelineSeekRequest,
525) -> Result<TimelineSeekPreview, Status> {
526 let selection = repo_seek_selection(req.selector)?;
527 let mode = repo_materialize_mode(req.mode);
528 let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
529 let preview = inner
530 .repo()
531 .preview_timeline_seek_constrained(
532 &store,
533 &selection.thread,
534 &selection.selector,
535 mode,
536 selection.branch_constraint.as_ref(),
537 )
538 .map_err(to_status)?;
539 let view = TimelineView::rebuild(&store).map_err(to_status)?;
540 Ok(repo_seek_preview_to_proto(&view, &preview))
541}
542
543async fn materialize_timeline_cursor_impl(
544 inner: &GrpcLocalService,
545 req: MaterializeTimelineCursorRequest,
546) -> Result<TimelineMaterializeResponse, Status> {
547 let selection = repo_seek_selection(req.selector)?;
548 let mode = repo_materialize_mode(req.mode);
549 let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
550 let before_view = TimelineView::rebuild(&store).map_err(to_status)?;
551
552 let outcome = inner
553 .repo()
554 .materialize_timeline_cursor_constrained(
555 &store,
556 &selection.thread,
557 &selection.selector,
558 mode,
559 selection.branch_constraint.as_ref(),
560 now_ms(),
561 )
562 .map_err(to_status)?;
563 materialize_response_from_outcome(&store, &before_view, &selection.thread, outcome)
564}
565
566async fn fork_timeline_from_selector_impl(
567 inner: &GrpcLocalService,
568 req: ForkTimelineFromSelectorRequest,
569) -> Result<ForkTimelineFromSelectorResponse, Status> {
570 let selection = repo_seek_selection(req.selector)?;
571 let branch_id = non_empty(req.branch_id).map(TimelineBranchId::new);
572 let reason = parse_branch_reason_or_default(&req.reason)?;
573 let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
574 let outcome = inner
575 .repo()
576 .fork_timeline_from_selector(
577 &store,
578 &selection.thread,
579 &selection.selector,
580 selection.branch_constraint.as_ref(),
581 branch_id,
582 reason,
583 now_ms(),
584 )
585 .map_err(to_status)?;
586 let operation = record_from_store(&store, outcome.operation_id)?;
587 let view = TimelineView::rebuild(&store).map_err(to_status)?;
588
589 Ok(ForkTimelineFromSelectorResponse {
590 navigation: Some(timeline_navigation_snapshot_to_proto(
591 &view,
592 outcome.navigation,
593 )),
594 operation: Some(operation),
595 branch_id: outcome.branch_id.to_string(),
596 parent_branch_id: outcome.parent_branch_id.to_string(),
597 from_step_id: outcome
598 .from_step_id
599 .map(|step_id| step_id.to_string())
600 .unwrap_or_default(),
601 })
602}
603
604async fn reset_timeline_cursor_impl(
605 inner: &GrpcLocalService,
606 req: ResetTimelineCursorRequest,
607) -> Result<ResetTimelineCursorResponse, Status> {
608 let selection = repo_seek_selection(req.selector)?;
609 let mode = repo_materialize_mode(req.mode);
610 let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
611 let before_view = TimelineView::rebuild(&store).map_err(to_status)?;
612 let outcome = inner
613 .repo()
614 .reset_timeline_cursor(
615 &store,
616 &selection.thread,
617 &selection.selector,
618 mode,
619 selection.branch_constraint.as_ref(),
620 req.materialize_checkout,
621 now_ms(),
622 )
623 .map_err(to_status)?;
624 let cursor_operation = outcome
625 .cursor_operation_id
626 .map(|id| record_from_store(&store, id))
627 .transpose()?;
628 let materialization = outcome
629 .materialization
630 .map(|materialization| {
631 materialize_response_from_outcome(
632 &store,
633 &before_view,
634 &selection.thread,
635 materialization,
636 )
637 })
638 .transpose()?;
639 let view = TimelineView::rebuild(&store).map_err(to_status)?;
640
641 Ok(ResetTimelineCursorResponse {
642 navigation: Some(timeline_navigation_snapshot_to_proto(
643 &view,
644 outcome.navigation,
645 )),
646 cursor_operation,
647 materialization,
648 })
649}
650
651async fn recover_timeline_materialization_impl(
652 inner: &GrpcLocalService,
653 req: RecoverTimelineMaterializationRequest,
654) -> Result<RecoverTimelineMaterializationResponse, Status> {
655 if req.thread.trim().is_empty() {
656 return Err(Status::invalid_argument("thread is required"));
657 }
658 let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
659 let outcome = inner
660 .repo()
661 .recover_timeline_materialization_action(&store, &req.thread)
662 .map_err(to_status)?;
663 let recovered_cursor_operation = outcome
664 .recovery
665 .cursor_operation_id
666 .map(|id| record_from_store(&store, id))
667 .transpose()?;
668 let recovery_status = recovery_status_to_wire_code(&outcome.recovery);
669 let recovery_blockers = recovery_blockers_to_wire_details(&outcome.recovery);
670 let view = TimelineView::rebuild(&store).map_err(to_status)?;
671
672 Ok(RecoverTimelineMaterializationResponse {
673 navigation: Some(timeline_navigation_snapshot_to_proto(
674 &view,
675 outcome.navigation,
676 )),
677 recovered_cursor_operation,
678 recovery_status: recovery_status as i32,
679 recovery_blockers,
680 })
681}
682
683fn materialize_response_from_outcome(
684 store: &TimelineStore,
685 before_view: &TimelineView,
686 thread: &str,
687 outcome: TimelineMaterializeOutcome,
688) -> Result<TimelineMaterializeResponse, Status> {
689 let status = outcome.status.clone();
690 let cursor_operation = outcome
691 .cursor_operation_id
692 .map(|id| record_from_store(store, id))
693 .transpose()?;
694 let recovered_cursor_operation = outcome
695 .recovery
696 .cursor_operation_id
697 .map(|id| record_from_store(store, id))
698 .transpose()?;
699 let after_view = TimelineView::rebuild(store).map_err(to_status)?;
700 let blockers = blockers_to_wire_details(&outcome.preview.blockers);
701 let recovery_blockers = recovery_blockers_to_wire_details(&outcome.recovery);
702
703 Ok(TimelineMaterializeResponse {
704 updated_status: Some(status_for_thread(&after_view, thread)),
705 preview: Some(repo_seek_preview_to_proto(before_view, &outcome.preview)),
706 cursor_operation,
707 materialized: matches!(
708 status,
709 TimelineMaterializeStatus::Materialized | TimelineMaterializeStatus::AlreadyAtTarget
710 ),
711 blockers,
712 status: materialize_status_to_wire_code(&status) as i32,
713 recovered_cursor_operation,
714 recovery_status: recovery_status_to_wire_code(&outcome.recovery) as i32,
715 recovery_blockers,
716 })
717}
718
719struct RepoSeekSelection {
720 thread: String,
721 selector: RepoTimelineSeekSelector,
722 branch_constraint: Option<TimelineSeekBranchConstraint>,
723}
724
725fn repo_seek_selection(
726 selector: Option<TimelineSeekSelector>,
727) -> Result<RepoSeekSelection, Status> {
728 let selector = selector.ok_or_else(|| Status::invalid_argument("selector is required"))?;
729 let target = selector
730 .target
731 .ok_or_else(|| Status::invalid_argument("selector target is required"))?;
732
733 match target {
734 timeline_seek_selector::Target::Step(step) => repo_step_selection(step),
735 timeline_seek_selector::Target::NativeToolCall(native) => repo_native_selection(native),
736 timeline_seek_selector::Target::Undo(cursor) => {
737 repo_cursor_selection(cursor, RepoTimelineSeekSelector::Undo)
738 }
739 timeline_seek_selector::Target::Redo(cursor) => {
740 repo_cursor_selection(cursor, RepoTimelineSeekSelector::Redo)
741 }
742 timeline_seek_selector::Target::CurrentCursor(cursor) => {
743 repo_cursor_selection(cursor, RepoTimelineSeekSelector::CurrentCursor)
744 }
745 }
746}
747
748fn repo_step_selection(selector: TimelineSeekStepSelector) -> Result<RepoSeekSelection, Status> {
749 if selector.thread.trim().is_empty() {
750 return Err(Status::invalid_argument("selector.step.thread is required"));
751 }
752 if selector.step_id.trim().is_empty() {
753 return Err(Status::invalid_argument(
754 "selector.step.step_id is required",
755 ));
756 }
757 let branch_constraint = non_empty(selector.branch_id)
758 .map(TimelineBranchId::new)
759 .map(TimelineSeekBranchConstraint::Target);
760 Ok(RepoSeekSelection {
761 thread: selector.thread,
762 selector: RepoTimelineSeekSelector::StepId(TimelineStepId::new(selector.step_id)),
763 branch_constraint,
764 })
765}
766
767fn repo_native_selection(
768 selector: TimelineSeekNativeToolCallSelector,
769) -> Result<RepoSeekSelection, Status> {
770 if selector.thread.trim().is_empty() {
771 return Err(Status::invalid_argument(
772 "selector.native_tool_call.thread is required",
773 ));
774 }
775 if selector.harness.trim().is_empty() {
776 return Err(Status::invalid_argument(
777 "selector.native_tool_call.harness is required",
778 ));
779 }
780 if selector.tool_call_id.trim().is_empty() {
781 return Err(Status::invalid_argument(
782 "selector.native_tool_call.tool_call_id is required",
783 ));
784 }
785 Ok(RepoSeekSelection {
786 thread: selector.thread,
787 selector: RepoTimelineSeekSelector::NativeToolCall(TimelineNativeToolKey {
788 harness: selector.harness,
789 session_id: non_empty(selector.session_id),
790 message_id: non_empty(selector.message_id),
791 tool_call_id: selector.tool_call_id,
792 }),
793 branch_constraint: None,
794 })
795}
796
797fn repo_cursor_selection(
798 selector: TimelineCursorSelector,
799 repo_selector: RepoTimelineSeekSelector,
800) -> Result<RepoSeekSelection, Status> {
801 if selector.thread.trim().is_empty() {
802 return Err(Status::invalid_argument(
803 "selector cursor thread is required",
804 ));
805 }
806 let branch_constraint = non_empty(selector.branch_id)
807 .map(TimelineBranchId::new)
808 .map(TimelineSeekBranchConstraint::Current);
809 Ok(RepoSeekSelection {
810 thread: selector.thread,
811 selector: repo_selector,
812 branch_constraint,
813 })
814}
815
816fn repo_seek_preview_to_proto(
817 view: &TimelineView,
818 preview: &RepoTimelineSeekPreview,
819) -> TimelineSeekPreview {
820 let current_step = preview
821 .current_step_id
822 .as_ref()
823 .and_then(|step_id| view.step(&preview.thread, step_id))
824 .map(step_summary_to_proto);
825 let current_step_for_state = preview
826 .current_step_id
827 .as_ref()
828 .and_then(|step_id| view.step(&preview.thread, step_id))
829 .map(step_summary_to_proto);
830 let target_step = preview
831 .target
832 .step_id
833 .as_ref()
834 .and_then(|step_id| view.step(&preview.thread, step_id));
835 let worktree_dirty = preview
836 .worktree_status
837 .as_ref()
838 .is_some_and(|status| !status.is_clean());
839 let blockers = blockers_to_wire_details(&preview.blockers);
840 let materialization_supported = !preview
841 .blockers
842 .iter()
843 .any(|blocker| matches!(blocker, TimelineMaterializationBlocker::UnsupportedMode(_)));
844
845 TimelineSeekPreview {
846 current_status: Some(AgentTimelineStatus {
847 thread: preview.thread.clone(),
848 current_branch_id: preview
849 .current_branch_id
850 .as_ref()
851 .map(|id| id.to_string())
852 .unwrap_or_default(),
853 current_step_id: preview
854 .current_step_id
855 .as_ref()
856 .map(|id| id.to_string())
857 .unwrap_or_default(),
858 current_state: preview
859 .current_state
860 .map(|state| AgentTimelineStateSummary {
861 state_id: state.as_bytes().to_vec(),
862 display_id: state.to_string_full(),
863 source_step_id: preview
864 .current_step_id
865 .as_ref()
866 .map(|id| id.to_string())
867 .unwrap_or_default(),
868 payload: current_step_for_state.and_then(|step| step.payload),
869 }),
870 branch_count: view.branch_count(&preview.thread) as u32,
871 step_count: view.step_count(&preview.thread) as u32,
872 }),
873 current_step,
874 target_branch_id: preview.target.branch_id.to_string(),
875 target_step_id: preview
876 .target
877 .step_id
878 .as_ref()
879 .map(|step_id| step_id.to_string())
880 .unwrap_or_default(),
881 target_state: preview.target.state.as_bytes().to_vec(),
882 target_state_display_id: preview.target.state.to_string_full(),
883 target_step: target_step.map(step_summary_to_proto),
884 current_state: preview
885 .current_state
886 .map(|state| state.as_bytes().to_vec())
887 .unwrap_or_default(),
888 current_state_display_id: preview
889 .current_state
890 .map(|state| state.to_string_full())
891 .unwrap_or_default(),
892 changed_paths: preview.changed_paths.clone(),
893 worktree_dirty,
894 worktree_dirty_known: preview.worktree_status.is_some(),
895 blockers,
896 materialization_supported,
897 checkout_state: preview
898 .checkout_state
899 .map(|state| state.as_bytes().to_vec())
900 .unwrap_or_default(),
901 checkout_state_display_id: preview
902 .checkout_state
903 .map(|state| state.to_string_full())
904 .unwrap_or_default(),
905 can_materialize: preview.can_materialize(),
906 }
907}
908
909fn timeline_navigation_snapshot_to_proto(
910 view: &TimelineView,
911 snapshot: RepoTimelineNavigationSnapshot,
912) -> WireTimelineNavigationSnapshot {
913 WireTimelineNavigationSnapshot {
914 thread: snapshot.thread.clone(),
915 cursor: Some(timeline_navigation_cursor_to_proto(snapshot.cursor)),
916 branches: snapshot
917 .branches
918 .into_iter()
919 .map(timeline_navigation_branch_to_proto)
920 .collect(),
921 steps: snapshot
922 .steps
923 .into_iter()
924 .map(timeline_navigation_step_to_proto)
925 .collect(),
926 active_branch_path: snapshot
927 .active_branch_path
928 .iter()
929 .map(ToString::to_string)
930 .collect(),
931 actions: Some(WireTimelineNavigationActionAvailability {
932 can_undo: snapshot.actions.can_undo,
933 can_redo: snapshot.actions.can_redo,
934 }),
935 recovery: snapshot.recovery.map(timeline_navigation_recovery_to_proto),
936 status: Some(status_for_thread(view, &snapshot.thread)),
937 }
938}
939
940fn timeline_navigation_cursor_to_proto(
941 cursor: repo::TimelineNavigationCursor,
942) -> WireTimelineNavigationCursor {
943 WireTimelineNavigationCursor {
944 branch_id: cursor
945 .branch_id
946 .as_ref()
947 .map(ToString::to_string)
948 .unwrap_or_default(),
949 step_id: cursor
950 .step_id
951 .as_ref()
952 .map(ToString::to_string)
953 .unwrap_or_default(),
954 state: cursor
955 .state
956 .map(|state| state.as_bytes().to_vec())
957 .unwrap_or_default(),
958 state_display_id: cursor
959 .state
960 .map(|state| state.to_string_full())
961 .unwrap_or_default(),
962 }
963}
964
965fn timeline_navigation_branch_to_proto(
966 branch: repo::TimelineNavigationBranch,
967) -> WireTimelineNavigationBranch {
968 WireTimelineNavigationBranch {
969 branch_id: branch.branch_id.to_string(),
970 parent_branch_id: branch
971 .parent_branch_id
972 .as_ref()
973 .map(ToString::to_string)
974 .unwrap_or_default(),
975 forked_from_step_id: branch
976 .forked_from_step_id
977 .as_ref()
978 .map(ToString::to_string)
979 .unwrap_or_default(),
980 forked_from_state: branch
981 .forked_from_state
982 .map(|state| state.as_bytes().to_vec())
983 .unwrap_or_default(),
984 forked_from_state_display_id: branch
985 .forked_from_state
986 .map(|state| state.to_string_full())
987 .unwrap_or_default(),
988 reason: branch
989 .reason
990 .as_ref()
991 .map(branch_reason_to_wire)
992 .unwrap_or_default()
993 .to_string(),
994 created_at_ms: branch.created_at_ms.unwrap_or_default(),
995 operation_ids: branch
996 .operation_ids
997 .iter()
998 .map(|id| id.as_bytes().to_vec())
999 .collect(),
1000 operation_display_ids: branch
1001 .operation_ids
1002 .iter()
1003 .map(TimelineOperationId::to_string_full)
1004 .collect(),
1005 step_ids: branch.step_ids.iter().map(ToString::to_string).collect(),
1006 is_active: branch.is_active,
1007 is_on_active_path: branch.is_on_active_path,
1008 }
1009}
1010
1011fn timeline_navigation_step_to_proto(
1012 step: RepoTimelineNavigationStep,
1013) -> WireTimelineNavigationStep {
1014 let cursor_state = step.cursor_state;
1015 WireTimelineNavigationStep {
1016 step: Some(AgentTimelineStepSummary {
1017 thread: step.thread,
1018 step_id: step.step_id.to_string(),
1019 branch_id: step.branch_id.to_string(),
1020 parent_step_id: step
1021 .parent_step_id
1022 .as_ref()
1023 .map(ToString::to_string)
1024 .unwrap_or_default(),
1025 native: step.native.map(native_to_proto),
1026 tool_name: step.tool_name.unwrap_or_default(),
1027 status: step
1028 .status
1029 .as_ref()
1030 .map(tool_call_status_to_wire)
1031 .unwrap_or_default()
1032 .to_string(),
1033 changed: step.changed.unwrap_or(false),
1034 touched_paths: step.touched_paths,
1035 before_state: step
1036 .before_state
1037 .map(|state| state.as_bytes().to_vec())
1038 .unwrap_or_default(),
1039 after_state: step
1040 .after_state
1041 .map(|state| state.as_bytes().to_vec())
1042 .unwrap_or_default(),
1043 capture_state: step
1044 .capture_state
1045 .map(|state| state.as_bytes().to_vec())
1046 .unwrap_or_default(),
1047 payload: timeline_navigation_payload_to_proto(step.payload_summary, step.payload_hash),
1048 labels: step.labels.iter().map(label_to_wire).collect(),
1049 started_at_ms: step.started_at_ms.unwrap_or_default(),
1050 finished_at_ms: step.finished_at_ms.unwrap_or_default(),
1051 operation_ids: step
1052 .operation_ids
1053 .iter()
1054 .map(|id| id.as_bytes().to_vec())
1055 .collect(),
1056 operation_display_ids: step
1057 .operation_ids
1058 .iter()
1059 .map(TimelineOperationId::to_string_full)
1060 .collect(),
1061 capture_oplog_batch_id: step.capture_oplog_batch_id,
1062 }),
1063 cursor_state: cursor_state
1064 .map(|state| state.as_bytes().to_vec())
1065 .unwrap_or_default(),
1066 cursor_state_display_id: cursor_state
1067 .map(|state| state.to_string_full())
1068 .unwrap_or_default(),
1069 is_current: step.is_current,
1070 is_on_active_branch_path: step.is_on_active_branch_path,
1071 can_seek: step.can_seek,
1072 can_fork: step.can_fork,
1073 can_reset: step.can_reset,
1074 can_materialize: step.can_materialize,
1075 has_boundary_warning: step.has_boundary_warning,
1076 }
1077}
1078
1079fn timeline_navigation_payload_to_proto(
1080 summary: Option<String>,
1081 hash: Option<ContentHash>,
1082) -> Option<AgentTimelineToolPayload> {
1083 if summary.is_none() && hash.is_none() {
1084 return None;
1085 }
1086 Some(AgentTimelineToolPayload {
1087 summary: summary.unwrap_or_default(),
1088 hash: hash
1089 .map(|hash| hash.as_bytes().to_vec())
1090 .unwrap_or_default(),
1091 })
1092}
1093
1094fn timeline_navigation_recovery_to_proto(
1095 recovery: repo::TimelineNavigationRecovery,
1096) -> WireTimelineNavigationRecovery {
1097 WireTimelineNavigationRecovery {
1098 status: timeline_navigation_recovery_status_to_proto(recovery.status) as i32,
1099 thread: recovery.thread,
1100 branch_id: recovery.branch_id.to_string(),
1101 from_step_id: recovery
1102 .from_step_id
1103 .as_ref()
1104 .map(ToString::to_string)
1105 .unwrap_or_default(),
1106 to_step_id: recovery
1107 .to_step_id
1108 .as_ref()
1109 .map(ToString::to_string)
1110 .unwrap_or_default(),
1111 from_state: recovery.from_state.as_bytes().to_vec(),
1112 from_state_display_id: recovery.from_state.to_string_full(),
1113 to_state: recovery.to_state.as_bytes().to_vec(),
1114 to_state_display_id: recovery.to_state.to_string_full(),
1115 reason: cursor_reason_to_wire(&recovery.reason).to_string(),
1116 moved_at_ms: recovery.moved_at_ms,
1117 checkout_state: recovery
1118 .checkout_state
1119 .map(|state| state.as_bytes().to_vec())
1120 .unwrap_or_default(),
1121 checkout_state_display_id: recovery
1122 .checkout_state
1123 .map(|state| state.to_string_full())
1124 .unwrap_or_default(),
1125 checkout_state_known: recovery.checkout_state.is_some(),
1126 }
1127}
1128
1129fn timeline_navigation_recovery_status_to_proto(
1130 status: RepoTimelineNavigationRecoveryStatus,
1131) -> WireTimelineNavigationRecoveryStatus {
1132 match status {
1133 RepoTimelineNavigationRecoveryStatus::PendingCursorRecord => {
1134 WireTimelineNavigationRecoveryStatus::PendingCursorRecord
1135 }
1136 RepoTimelineNavigationRecoveryStatus::Blocked => {
1137 WireTimelineNavigationRecoveryStatus::Blocked
1138 }
1139 RepoTimelineNavigationRecoveryStatus::AlreadyApplied => {
1140 WireTimelineNavigationRecoveryStatus::AlreadyApplied
1141 }
1142 }
1143}
1144
1145fn repo_materialize_mode(value: i32) -> RepoTimelineMaterializeMode {
1146 match TimelineMaterializeMode::try_from(value).unwrap_or(TimelineMaterializeMode::Unspecified) {
1147 TimelineMaterializeMode::Unspecified | TimelineMaterializeMode::FailIfDirty => {
1148 RepoTimelineMaterializeMode::FailIfDirty
1149 }
1150 TimelineMaterializeMode::CaptureCurrentThenSeek => {
1151 RepoTimelineMaterializeMode::CaptureCurrentThenSeek
1152 }
1153 }
1154}
1155
1156fn repo_materialize_mode_to_wire(mode: RepoTimelineMaterializeMode) -> &'static str {
1157 match mode {
1158 RepoTimelineMaterializeMode::FailIfDirty => "fail-if-dirty",
1159 RepoTimelineMaterializeMode::CaptureCurrentThenSeek => "capture-current-then-seek",
1160 }
1161}
1162
1163fn repo_materialize_mode_to_wire_code(
1164 mode: RepoTimelineMaterializeMode,
1165) -> TimelineMaterializeMode {
1166 match mode {
1167 RepoTimelineMaterializeMode::FailIfDirty => TimelineMaterializeMode::FailIfDirty,
1168 RepoTimelineMaterializeMode::CaptureCurrentThenSeek => {
1169 TimelineMaterializeMode::CaptureCurrentThenSeek
1170 }
1171 }
1172}
1173
1174fn materialize_status_to_wire_code(
1175 status: &TimelineMaterializeStatus,
1176) -> WireTimelineMaterializeStatus {
1177 match status {
1178 TimelineMaterializeStatus::Materialized => WireTimelineMaterializeStatus::Materialized,
1179 TimelineMaterializeStatus::AlreadyAtTarget => {
1180 WireTimelineMaterializeStatus::AlreadyAtTarget
1181 }
1182 TimelineMaterializeStatus::Refused => WireTimelineMaterializeStatus::Refused,
1183 TimelineMaterializeStatus::Unsupported => WireTimelineMaterializeStatus::Unsupported,
1184 TimelineMaterializeStatus::RecoveryBlocked => {
1185 WireTimelineMaterializeStatus::RecoveryBlocked
1186 }
1187 }
1188}
1189
1190fn recovery_status_to_wire_code(
1191 recovery: &TimelineMaterializationRecoveryOutcome,
1192) -> WireTimelineMaterializationRecoveryStatus {
1193 match recovery.status {
1194 TimelineMaterializationRecoveryStatus::NoPending => {
1195 WireTimelineMaterializationRecoveryStatus::NoPending
1196 }
1197 TimelineMaterializationRecoveryStatus::CursorRecorded => {
1198 WireTimelineMaterializationRecoveryStatus::CursorRecorded
1199 }
1200 TimelineMaterializationRecoveryStatus::AlreadyApplied => {
1201 WireTimelineMaterializationRecoveryStatus::AlreadyApplied
1202 }
1203 TimelineMaterializationRecoveryStatus::Blocked => {
1204 WireTimelineMaterializationRecoveryStatus::Blocked
1205 }
1206 }
1207}
1208
1209fn recovery_blockers_to_wire_details(
1210 recovery: &TimelineMaterializationRecoveryOutcome,
1211) -> Vec<WireTimelineMaterializationRecoveryBlocker> {
1212 recovery
1213 .blocker
1214 .iter()
1215 .map(recovery_blocker_to_wire_detail)
1216 .collect()
1217}
1218
1219fn recovery_blocker_to_wire(blocker: &TimelineMaterializationRecoveryBlocker) -> String {
1220 match blocker {
1221 TimelineMaterializationRecoveryBlocker::CheckoutNotAtTarget {
1222 checkout_state,
1223 target_state,
1224 } => {
1225 let checkout = checkout_state
1226 .map(|state| state.to_string_full())
1227 .unwrap_or_else(|| "unknown".to_string());
1228 format!(
1229 "pending timeline materialization target is {}, but checkout is at {}",
1230 target_state.to_string_full(),
1231 checkout
1232 )
1233 }
1234 }
1235}
1236
1237fn recovery_blocker_to_wire_detail(
1238 blocker: &TimelineMaterializationRecoveryBlocker,
1239) -> WireTimelineMaterializationRecoveryBlocker {
1240 match blocker {
1241 TimelineMaterializationRecoveryBlocker::CheckoutNotAtTarget {
1242 checkout_state,
1243 target_state,
1244 } => WireTimelineMaterializationRecoveryBlocker {
1245 kind: TimelineMaterializationRecoveryBlockerKind::CheckoutNotAtTarget as i32,
1246 message: recovery_blocker_to_wire(blocker),
1247 checkout_state: checkout_state
1248 .as_ref()
1249 .map(|state| state.as_bytes().to_vec())
1250 .unwrap_or_default(),
1251 checkout_state_display_id: checkout_state
1252 .as_ref()
1253 .map(|state| state.to_string_full())
1254 .unwrap_or_default(),
1255 checkout_state_known: checkout_state.is_some(),
1256 target_state: target_state.as_bytes().to_vec(),
1257 target_state_display_id: target_state.to_string_full(),
1258 },
1259 }
1260}
1261
1262fn blockers_to_wire_details(
1263 blockers: &[TimelineMaterializationBlocker],
1264) -> Vec<WireTimelineMaterializationBlocker> {
1265 blockers.iter().map(blocker_to_wire_detail).collect()
1266}
1267
1268fn blocker_to_wire(blocker: &TimelineMaterializationBlocker) -> String {
1269 match blocker {
1270 TimelineMaterializationBlocker::UnsupportedMode(mode) => format!(
1271 "timeline materialization mode '{}' is not supported",
1272 repo_materialize_mode_to_wire(*mode)
1273 ),
1274 TimelineMaterializationBlocker::DirtyWorktree { paths } if paths.is_empty() => {
1275 "worktree has local changes".to_string()
1276 }
1277 TimelineMaterializationBlocker::DirtyWorktree { paths } => {
1278 format!("worktree has local changes: {}", paths.join(", "))
1279 }
1280 TimelineMaterializationBlocker::CheckoutStateUnknown => {
1281 "checkout state is unknown".to_string()
1282 }
1283 TimelineMaterializationBlocker::MissingTree(state) => {
1284 format!("tree for state {} is unavailable", state.to_string_full())
1285 }
1286 }
1287}
1288
1289fn blocker_to_wire_detail(
1290 blocker: &TimelineMaterializationBlocker,
1291) -> WireTimelineMaterializationBlocker {
1292 match blocker {
1293 TimelineMaterializationBlocker::UnsupportedMode(mode) => {
1294 WireTimelineMaterializationBlocker {
1295 kind: TimelineMaterializationBlockerKind::UnsupportedMode as i32,
1296 message: blocker_to_wire(blocker),
1297 unsupported_mode: repo_materialize_mode_to_wire_code(*mode) as i32,
1298 paths: Vec::new(),
1299 state: Vec::new(),
1300 state_display_id: String::new(),
1301 }
1302 }
1303 TimelineMaterializationBlocker::DirtyWorktree { paths } => {
1304 WireTimelineMaterializationBlocker {
1305 kind: TimelineMaterializationBlockerKind::DirtyWorktree as i32,
1306 message: blocker_to_wire(blocker),
1307 unsupported_mode: TimelineMaterializeMode::Unspecified as i32,
1308 paths: paths.clone(),
1309 state: Vec::new(),
1310 state_display_id: String::new(),
1311 }
1312 }
1313 TimelineMaterializationBlocker::CheckoutStateUnknown => {
1314 WireTimelineMaterializationBlocker {
1315 kind: TimelineMaterializationBlockerKind::CheckoutStateUnknown as i32,
1316 message: blocker_to_wire(blocker),
1317 unsupported_mode: TimelineMaterializeMode::Unspecified as i32,
1318 paths: Vec::new(),
1319 state: Vec::new(),
1320 state_display_id: String::new(),
1321 }
1322 }
1323 TimelineMaterializationBlocker::MissingTree(state) => WireTimelineMaterializationBlocker {
1324 kind: TimelineMaterializationBlockerKind::MissingTree as i32,
1325 message: blocker_to_wire(blocker),
1326 unsupported_mode: TimelineMaterializeMode::Unspecified as i32,
1327 paths: Vec::new(),
1328 state: state.as_bytes().to_vec(),
1329 state_display_id: state.to_string_full(),
1330 },
1331 }
1332}
1333
1334fn write_cursor_move(
1335 store: &TimelineStore,
1336 target: &TimelineSeekTarget,
1337 view: &TimelineView,
1338 reason: TimelineCursorMoveReason,
1339) -> Result<TimelineCursorMoveResponse, Status> {
1340 let status = view.status(&target.thread);
1341 let from_step_id = status.and_then(|status| status.current_step_id.clone());
1342 let from_state = status
1343 .and_then(|status| status.current_state)
1344 .unwrap_or(target.state);
1345 let id = store
1346 .record_cursor_move(TimelineCursorMoveRecord {
1347 thread: target.thread.clone(),
1348 branch_id: target.branch_id.clone(),
1349 from_step_id,
1350 to_step_id: target.step_id.clone(),
1351 from_state,
1352 to_state: target.state,
1353 reason,
1354 moved_at_ms: now_ms(),
1355 labels: Vec::new(),
1356 })
1357 .map_err(to_status)?;
1358 let record = record_from_store(store, id)?;
1359 let view = TimelineView::rebuild(store).map_err(to_status)?;
1360 Ok(TimelineCursorMoveResponse {
1361 status: Some(status_for_thread(&view, &target.thread)),
1362 operation: Some(record),
1363 })
1364}
1365
1366fn write_timeline_envelope(
1367 store: &TimelineStore,
1368 envelope: TimelineOperationEnvelope,
1369) -> Result<AgentTimelineOperationRecord, Status> {
1370 let bytes = envelope
1371 .encode()
1372 .map_err(|err| Status::invalid_argument(err.to_string()))?;
1373 let id = store.write_operation_bytes(&bytes).map_err(to_status)?;
1374 record_from_envelope(id, envelope, bytes)
1375}
1376
1377fn parse_seek_reason(value: &str) -> Result<TimelineCursorMoveReason, Status> {
1378 if value.is_empty() || value == "seek-step" {
1379 return Ok(TimelineCursorMoveReason::SeekToolCall);
1380 }
1381 parse_cursor_reason(value)
1382}
1383
1384fn parse_branch_reason_or_default(value: &str) -> Result<TimelineBranchReason, Status> {
1385 if value.is_empty() {
1386 return Ok(TimelineBranchReason::ExplicitFork);
1387 }
1388 parse_branch_reason(value)
1389}
1390
1391fn now_ms() -> i64 {
1392 SystemTime::now()
1393 .duration_since(UNIX_EPOCH)
1394 .map(|duration| duration.as_millis() as i64)
1395 .unwrap_or_default()
1396}
1397
1398fn open_timeline_store_and_view(
1399 inner: &GrpcLocalService,
1400) -> Result<(TimelineStore, TimelineView), Status> {
1401 let store = TimelineStore::open(inner.repo().heddle_dir()).map_err(to_status)?;
1402 let view = TimelineView::rebuild(&store).map_err(to_status)?;
1403 Ok((store, view))
1404}
1405
1406fn record_from_store(
1407 store: &TimelineStore,
1408 id: TimelineOperationId,
1409) -> Result<AgentTimelineOperationRecord, Status> {
1410 let bytes = store
1411 .read_operation_bytes(&id)
1412 .map_err(to_status)?
1413 .ok_or_else(|| {
1414 Status::internal(format!(
1415 "timeline operation {} was not persisted",
1416 id.short()
1417 ))
1418 })?;
1419 let envelope = TimelineOperationEnvelope::decode(&bytes)
1420 .map_err(|err| Status::internal(format!("decode stored timeline operation: {err}")))?;
1421 record_from_envelope(id, envelope, bytes)
1422}
1423
1424fn status_for_thread(view: &TimelineView, thread: &str) -> AgentTimelineStatus {
1425 let status = view.status(thread);
1426 AgentTimelineStatus {
1427 thread: thread.to_string(),
1428 current_branch_id: status
1429 .and_then(|status| status.current_branch_id.as_ref())
1430 .map(|id| id.to_string())
1431 .unwrap_or_default(),
1432 current_step_id: status
1433 .and_then(|status| status.current_step_id.as_ref())
1434 .map(|id| id.to_string())
1435 .unwrap_or_default(),
1436 current_state: status.and_then(|status| state_summary_for_status(view, status)),
1437 branch_count: view.branch_count(thread) as u32,
1438 step_count: view.step_count(thread) as u32,
1439 }
1440}
1441
1442fn state_summary_for_status(
1443 view: &TimelineView,
1444 status: &TimelineThreadStatus,
1445) -> Option<AgentTimelineStateSummary> {
1446 let state = status.current_state?;
1447 let step = status
1448 .current_step_id
1449 .as_ref()
1450 .and_then(|step_id| view.step(&status.thread, step_id));
1451 Some(AgentTimelineStateSummary {
1452 state_id: state.as_bytes().to_vec(),
1453 display_id: state.to_string_full(),
1454 source_step_id: status
1455 .current_step_id
1456 .as_ref()
1457 .map(|id| id.to_string())
1458 .unwrap_or_default(),
1459 payload: step.and_then(step_payload_to_proto),
1460 })
1461}
1462
1463fn step_summary_to_proto(step: &TimelineStepSummary) -> AgentTimelineStepSummary {
1464 AgentTimelineStepSummary {
1465 thread: step.thread.clone(),
1466 step_id: step.step_id.to_string(),
1467 branch_id: step.branch_id.to_string(),
1468 parent_step_id: step
1469 .parent_step_id
1470 .as_ref()
1471 .map(|id| id.to_string())
1472 .unwrap_or_default(),
1473 native: step.native.clone().map(native_to_proto),
1474 tool_name: step.tool_name.clone().unwrap_or_default(),
1475 status: step
1476 .status
1477 .as_ref()
1478 .map(tool_call_status_to_wire)
1479 .unwrap_or_default()
1480 .to_string(),
1481 changed: step.changed.unwrap_or(false),
1482 touched_paths: step.touched_paths.clone(),
1483 before_state: step
1484 .before_state
1485 .map(|state| state.as_bytes().to_vec())
1486 .unwrap_or_default(),
1487 after_state: step
1488 .after_state
1489 .map(|state| state.as_bytes().to_vec())
1490 .unwrap_or_default(),
1491 capture_state: step
1492 .capture_state
1493 .map(|state| state.as_bytes().to_vec())
1494 .unwrap_or_default(),
1495 payload: step_payload_to_proto(step),
1496 labels: step.labels.iter().map(label_to_wire).collect(),
1497 started_at_ms: step.started_at_ms.unwrap_or_default(),
1498 finished_at_ms: step.finished_at_ms.unwrap_or_default(),
1499 operation_ids: step
1500 .operation_ids
1501 .iter()
1502 .map(|id| id.as_bytes().to_vec())
1503 .collect(),
1504 operation_display_ids: step
1505 .operation_ids
1506 .iter()
1507 .map(TimelineOperationId::to_string_full)
1508 .collect(),
1509 capture_oplog_batch_id: step.capture_oplog_batch_id,
1510 }
1511}
1512
1513fn step_payload_to_proto(step: &TimelineStepSummary) -> Option<AgentTimelineToolPayload> {
1514 if step.payload_summary.is_none() && step.payload_hash.is_none() {
1515 return None;
1516 }
1517 Some(AgentTimelineToolPayload {
1518 summary: step.payload_summary.clone().unwrap_or_default(),
1519 hash: step
1520 .payload_hash
1521 .map(|hash| hash.as_bytes().to_vec())
1522 .unwrap_or_default(),
1523 })
1524}
1525
1526fn native_key_from_parts(req: &SeekTimelineToNativeToolCallRequest) -> TimelineNativeToolKey {
1527 TimelineNativeToolKey {
1528 harness: req.harness.clone(),
1529 session_id: non_empty(req.session_id.clone()),
1530 message_id: non_empty(req.message_id.clone()),
1531 tool_call_id: req.tool_call_id.clone(),
1532 }
1533}
1534
1535fn native_key_from_resolve_request(req: &ResolveNativeToolCallRequest) -> TimelineNativeToolKey {
1536 TimelineNativeToolKey {
1537 harness: req.harness.clone(),
1538 session_id: non_empty(req.session_id.clone()),
1539 message_id: non_empty(req.message_id.clone()),
1540 tool_call_id: req.tool_call_id.clone(),
1541 }
1542}
1543
1544fn draft_to_envelope(
1545 draft: AgentTimelineOperationDraft,
1546) -> Result<TimelineOperationEnvelope, Status> {
1547 let labels = draft
1548 .labels
1549 .iter()
1550 .map(|label| parse_label(label))
1551 .collect::<Result<Vec<_>, _>>()?;
1552 let body = match draft
1553 .body
1554 .ok_or_else(|| Status::invalid_argument("operation body is required"))?
1555 {
1556 agent_timeline_operation_draft::Body::ToolCallStarted(body) => {
1557 TimelineOperationBodyV1::ToolCallStarted(tool_call_started_from_proto(body)?)
1558 }
1559 agent_timeline_operation_draft::Body::ToolCallFinished(body) => {
1560 TimelineOperationBodyV1::ToolCallFinished(tool_call_finished_from_proto(body)?)
1561 }
1562 agent_timeline_operation_draft::Body::CursorMoved(body) => {
1563 TimelineOperationBodyV1::CursorMoved(cursor_moved_from_proto(body)?)
1564 }
1565 agent_timeline_operation_draft::Body::BranchCreated(body) => {
1566 TimelineOperationBodyV1::BranchCreated(branch_created_from_proto(body)?)
1567 }
1568 };
1569 Ok(TimelineOperationEnvelope::new(body, labels))
1570}
1571
1572fn record_from_envelope(
1573 id: TimelineOperationId,
1574 envelope: TimelineOperationEnvelope,
1575 bytes: Vec<u8>,
1576) -> Result<AgentTimelineOperationRecord, Status> {
1577 let body = match envelope.body {
1578 TimelineOperationBodyV1::ToolCallStarted(body) => {
1579 agent_timeline_operation_record::Body::ToolCallStarted(tool_call_started_to_proto(body))
1580 }
1581 TimelineOperationBodyV1::ToolCallFinished(body) => {
1582 agent_timeline_operation_record::Body::ToolCallFinished(tool_call_finished_to_proto(
1583 body,
1584 ))
1585 }
1586 TimelineOperationBodyV1::CursorMoved(body) => {
1587 agent_timeline_operation_record::Body::CursorMoved(cursor_moved_to_proto(body))
1588 }
1589 TimelineOperationBodyV1::BranchCreated(body) => {
1590 agent_timeline_operation_record::Body::BranchCreated(branch_created_to_proto(body))
1591 }
1592 };
1593 Ok(AgentTimelineOperationRecord {
1594 operation_id: id.as_bytes().to_vec(),
1595 display_id: id.to_string_full(),
1596 schema_version: envelope.schema_version.into(),
1597 kind: envelope.kind.as_str().to_string(),
1598 labels: envelope.labels.iter().map(label_to_wire).collect(),
1599 body: Some(body),
1600 envelope: bytes,
1601 })
1602}
1603
1604fn tool_call_started_from_proto(
1605 body: AgentTimelineToolCallStarted,
1606) -> Result<ToolCallStartedV1, Status> {
1607 Ok(ToolCallStartedV1 {
1608 thread: body.thread,
1609 step_id: TimelineStepId::new(body.step_id),
1610 branch_id: TimelineBranchId::new(body.branch_id),
1611 parent_step_id: optional_step_id(body.parent_step_id),
1612 native: native_from_proto(body.native)?,
1613 tool_name: body.tool_name,
1614 before_state: change_id_from_bytes(&body.before_state, "before_state")?,
1615 payload: payload_from_proto(body.payload)?,
1616 started_at_ms: body.started_at_ms,
1617 })
1618}
1619
1620fn tool_call_finished_from_proto(
1621 body: AgentTimelineToolCallFinished,
1622) -> Result<ToolCallFinishedV1, Status> {
1623 Ok(ToolCallFinishedV1 {
1624 thread: body.thread,
1625 step_id: TimelineStepId::new(body.step_id),
1626 branch_id: TimelineBranchId::new(body.branch_id),
1627 native: native_from_proto(body.native)?,
1628 status: parse_tool_call_status(&body.status)?,
1629 before_state: change_id_from_bytes(&body.before_state, "before_state")?,
1630 after_state: change_id_from_bytes(&body.after_state, "after_state")?,
1631 capture_state: optional_change_id(body.capture_state, "capture_state")?,
1632 capture_oplog_batch_id: body.capture_oplog_batch_id,
1633 changed: body.changed,
1634 touched_paths: body.touched_paths,
1635 payload: payload_from_proto(body.payload)?,
1636 finished_at_ms: body.finished_at_ms,
1637 })
1638}
1639
1640fn cursor_moved_from_proto(body: AgentTimelineCursorMoved) -> Result<CursorMovedV1, Status> {
1641 Ok(CursorMovedV1 {
1642 thread: body.thread,
1643 branch_id: TimelineBranchId::new(body.branch_id),
1644 from_step_id: optional_step_id(body.from_step_id),
1645 to_step_id: optional_step_id(body.to_step_id),
1646 from_state: change_id_from_bytes(&body.from_state, "from_state")?,
1647 to_state: change_id_from_bytes(&body.to_state, "to_state")?,
1648 reason: parse_cursor_reason(&body.reason)?,
1649 moved_at_ms: body.moved_at_ms,
1650 })
1651}
1652
1653fn branch_created_from_proto(body: AgentTimelineBranchCreated) -> Result<BranchCreatedV1, Status> {
1654 Ok(BranchCreatedV1 {
1655 thread: body.thread,
1656 branch_id: TimelineBranchId::new(body.branch_id),
1657 parent_branch_id: optional_branch_id(body.parent_branch_id),
1658 from_step_id: optional_step_id(body.from_step_id),
1659 from_state: change_id_from_bytes(&body.from_state, "from_state")?,
1660 reason: parse_branch_reason(&body.reason)?,
1661 created_at_ms: body.created_at_ms,
1662 })
1663}
1664
1665fn tool_call_started_to_proto(body: ToolCallStartedV1) -> AgentTimelineToolCallStarted {
1666 AgentTimelineToolCallStarted {
1667 thread: body.thread,
1668 step_id: body.step_id.to_string(),
1669 branch_id: body.branch_id.to_string(),
1670 parent_step_id: body
1671 .parent_step_id
1672 .map(|id| id.to_string())
1673 .unwrap_or_default(),
1674 native: Some(native_to_proto(body.native)),
1675 tool_name: body.tool_name,
1676 before_state: body.before_state.as_bytes().to_vec(),
1677 payload: payload_to_proto(body.payload),
1678 started_at_ms: body.started_at_ms,
1679 }
1680}
1681
1682fn tool_call_finished_to_proto(body: ToolCallFinishedV1) -> AgentTimelineToolCallFinished {
1683 AgentTimelineToolCallFinished {
1684 thread: body.thread,
1685 step_id: body.step_id.to_string(),
1686 branch_id: body.branch_id.to_string(),
1687 native: Some(native_to_proto(body.native)),
1688 status: tool_call_status_to_wire(&body.status).to_string(),
1689 before_state: body.before_state.as_bytes().to_vec(),
1690 after_state: body.after_state.as_bytes().to_vec(),
1691 capture_state: body
1692 .capture_state
1693 .map(|state| state.as_bytes().to_vec())
1694 .unwrap_or_default(),
1695 capture_oplog_batch_id: body.capture_oplog_batch_id,
1696 changed: body.changed,
1697 touched_paths: body.touched_paths,
1698 payload: payload_to_proto(body.payload),
1699 finished_at_ms: body.finished_at_ms,
1700 }
1701}
1702
1703fn cursor_moved_to_proto(body: CursorMovedV1) -> AgentTimelineCursorMoved {
1704 AgentTimelineCursorMoved {
1705 thread: body.thread,
1706 branch_id: body.branch_id.to_string(),
1707 from_step_id: body
1708 .from_step_id
1709 .map(|id| id.to_string())
1710 .unwrap_or_default(),
1711 to_step_id: body.to_step_id.map(|id| id.to_string()).unwrap_or_default(),
1712 from_state: body.from_state.as_bytes().to_vec(),
1713 to_state: body.to_state.as_bytes().to_vec(),
1714 reason: cursor_reason_to_wire(&body.reason).to_string(),
1715 moved_at_ms: body.moved_at_ms,
1716 }
1717}
1718
1719fn branch_created_to_proto(body: BranchCreatedV1) -> AgentTimelineBranchCreated {
1720 AgentTimelineBranchCreated {
1721 thread: body.thread,
1722 branch_id: body.branch_id.to_string(),
1723 parent_branch_id: body
1724 .parent_branch_id
1725 .map(|id| id.to_string())
1726 .unwrap_or_default(),
1727 from_step_id: body
1728 .from_step_id
1729 .map(|id| id.to_string())
1730 .unwrap_or_default(),
1731 from_state: body.from_state.as_bytes().to_vec(),
1732 reason: branch_reason_to_wire(&body.reason).to_string(),
1733 created_at_ms: body.created_at_ms,
1734 }
1735}
1736
1737fn optional_step_id(value: String) -> Option<TimelineStepId> {
1738 (!value.is_empty()).then(|| TimelineStepId::new(value))
1739}
1740
1741fn optional_branch_id(value: String) -> Option<TimelineBranchId> {
1742 (!value.is_empty()).then(|| TimelineBranchId::new(value))
1743}
1744
1745fn native_from_proto(
1746 native: Option<AgentTimelineNativeToolCall>,
1747) -> Result<NativeToolCallRefV1, Status> {
1748 let native = native.ok_or_else(|| Status::invalid_argument("native tool call is required"))?;
1749 if native.harness.trim().is_empty() {
1750 return Err(Status::invalid_argument("native.harness must not be empty"));
1751 }
1752 if native.tool_call_id.trim().is_empty() {
1753 return Err(Status::invalid_argument(
1754 "native.tool_call_id must not be empty",
1755 ));
1756 }
1757 Ok(NativeToolCallRefV1 {
1758 harness: native.harness,
1759 session_id: non_empty(native.session_id),
1760 message_id: non_empty(native.message_id),
1761 tool_call_id: native.tool_call_id,
1762 })
1763}
1764
1765fn native_to_proto(native: NativeToolCallRefV1) -> AgentTimelineNativeToolCall {
1766 AgentTimelineNativeToolCall {
1767 harness: native.harness,
1768 session_id: native.session_id.unwrap_or_default(),
1769 message_id: native.message_id.unwrap_or_default(),
1770 tool_call_id: native.tool_call_id,
1771 }
1772}
1773
1774fn payload_from_proto(
1775 payload: Option<AgentTimelineToolPayload>,
1776) -> Result<Option<TimelineToolPayloadMetadata>, Status> {
1777 let Some(payload) = payload else {
1778 return Ok(None);
1779 };
1780 Ok(Some(TimelineToolPayloadMetadata {
1781 summary: non_empty(payload.summary),
1782 hash: optional_content_hash(payload.hash, "payload.hash")?,
1783 }))
1784}
1785
1786fn payload_to_proto(
1787 payload: Option<TimelineToolPayloadMetadata>,
1788) -> Option<AgentTimelineToolPayload> {
1789 payload.map(|payload| AgentTimelineToolPayload {
1790 summary: payload.summary.unwrap_or_default(),
1791 hash: payload
1792 .hash
1793 .map(|hash| hash.as_bytes().to_vec())
1794 .unwrap_or_default(),
1795 })
1796}
1797
1798fn change_id_from_bytes(bytes: &[u8], field: &str) -> Result<ChangeId, Status> {
1799 ChangeId::try_from_slice(bytes)
1800 .map_err(|err| Status::invalid_argument(format!("invalid {field}: {err}")))
1801}
1802
1803fn optional_change_id(bytes: Vec<u8>, field: &str) -> Result<Option<ChangeId>, Status> {
1804 if bytes.is_empty() {
1805 return Ok(None);
1806 }
1807 change_id_from_bytes(&bytes, field).map(Some)
1808}
1809
1810fn optional_content_hash(bytes: Vec<u8>, field: &str) -> Result<Option<ContentHash>, Status> {
1811 if bytes.is_empty() {
1812 return Ok(None);
1813 }
1814 if bytes.len() != 32 {
1815 return Err(Status::invalid_argument(format!(
1816 "invalid {field}: expected 32 bytes"
1817 )));
1818 }
1819 let mut arr = [0u8; 32];
1820 arr.copy_from_slice(&bytes);
1821 Ok(Some(ContentHash::from_bytes(arr)))
1822}
1823
1824fn non_empty(value: String) -> Option<String> {
1825 (!value.is_empty()).then_some(value)
1826}
1827
1828fn parse_label(value: &str) -> Result<TimelineLabel, Status> {
1829 match value {
1830 "repo-reversible" => Ok(TimelineLabel::RepoReversible),
1831 "external-side-effects-unknown" => Ok(TimelineLabel::ExternalSideEffectsUnknown),
1832 "ignored-path-touched" => Ok(TimelineLabel::IgnoredPathTouched),
1833 "outside-repo-touched" => Ok(TimelineLabel::OutsideRepoTouched),
1834 "purge-boundary" => Ok(TimelineLabel::PurgeBoundary),
1835 "capture-failed" => Ok(TimelineLabel::CaptureFailed),
1836 other => Err(Status::invalid_argument(format!(
1837 "unknown timeline label '{other}'"
1838 ))),
1839 }
1840}
1841
1842fn label_to_wire(label: &TimelineLabel) -> String {
1843 match label {
1844 TimelineLabel::RepoReversible => "repo-reversible",
1845 TimelineLabel::ExternalSideEffectsUnknown => "external-side-effects-unknown",
1846 TimelineLabel::IgnoredPathTouched => "ignored-path-touched",
1847 TimelineLabel::OutsideRepoTouched => "outside-repo-touched",
1848 TimelineLabel::PurgeBoundary => "purge-boundary",
1849 TimelineLabel::CaptureFailed => "capture-failed",
1850 }
1851 .to_string()
1852}
1853
1854fn parse_tool_call_status(value: &str) -> Result<TimelineToolCallStatus, Status> {
1855 match value {
1856 "succeeded" => Ok(TimelineToolCallStatus::Succeeded),
1857 "failed" => Ok(TimelineToolCallStatus::Failed),
1858 "cancelled" | "canceled" => Ok(TimelineToolCallStatus::Cancelled),
1859 other => Err(Status::invalid_argument(format!(
1860 "unknown timeline tool call status '{other}'"
1861 ))),
1862 }
1863}
1864
1865fn tool_call_status_to_wire(status: &TimelineToolCallStatus) -> &'static str {
1866 match status {
1867 TimelineToolCallStatus::Succeeded => "succeeded",
1868 TimelineToolCallStatus::Failed => "failed",
1869 TimelineToolCallStatus::Cancelled => "cancelled",
1870 }
1871}
1872
1873fn parse_cursor_reason(value: &str) -> Result<TimelineCursorMoveReason, Status> {
1874 match value {
1875 "seek-tool-call" => Ok(TimelineCursorMoveReason::SeekToolCall),
1876 "undo" => Ok(TimelineCursorMoveReason::Undo),
1877 "redo" => Ok(TimelineCursorMoveReason::Redo),
1878 "reset" => Ok(TimelineCursorMoveReason::Reset),
1879 "auto-advance" => Ok(TimelineCursorMoveReason::AutoAdvance),
1880 other => Err(Status::invalid_argument(format!(
1881 "unknown timeline cursor move reason '{other}'"
1882 ))),
1883 }
1884}
1885
1886fn cursor_reason_to_wire(reason: &TimelineCursorMoveReason) -> &'static str {
1887 match reason {
1888 TimelineCursorMoveReason::SeekToolCall => "seek-tool-call",
1889 TimelineCursorMoveReason::Undo => "undo",
1890 TimelineCursorMoveReason::Redo => "redo",
1891 TimelineCursorMoveReason::Reset => "reset",
1892 TimelineCursorMoveReason::AutoAdvance => "auto-advance",
1893 }
1894}
1895
1896fn parse_branch_reason(value: &str) -> Result<TimelineBranchReason, Status> {
1897 match value {
1898 "edit-from-rewound-cursor" => Ok(TimelineBranchReason::EditFromRewoundCursor),
1899 "explicit-fork" => Ok(TimelineBranchReason::ExplicitFork),
1900 "retry" => Ok(TimelineBranchReason::Retry),
1901 "fan-out" => Ok(TimelineBranchReason::FanOut),
1902 other => Err(Status::invalid_argument(format!(
1903 "unknown timeline branch reason '{other}'"
1904 ))),
1905 }
1906}
1907
1908fn branch_reason_to_wire(reason: &TimelineBranchReason) -> &'static str {
1909 match reason {
1910 TimelineBranchReason::EditFromRewoundCursor => "edit-from-rewound-cursor",
1911 TimelineBranchReason::ExplicitFork => "explicit-fork",
1912 TimelineBranchReason::Retry => "retry",
1913 TimelineBranchReason::FanOut => "fan-out",
1914 }
1915}
1916
1917#[cfg(test)]
1918mod tests {
1919 use std::{fs, sync::Arc};
1920
1921 use grpc::heddle::v1::{
1922 AgentTimelineNativeToolCall, AgentTimelineOperationDraft, AgentTimelineToolCallFinished,
1923 AgentTimelineToolCallStarted, ForkTimelineFromSelectorRequest,
1924 GetTimelineNavigationRequest, GetTimelineOperationRequest, GetTimelineStatusRequest,
1925 ListTimelineStepsRequest, MaterializeTimelineCursorRequest, PreviewTimelineSeekRequest,
1926 RecordTimelineOperationRequest, RecoverTimelineMaterializationRequest,
1927 ResetTimelineCursorRequest, ResolveNativeToolCallRequest,
1928 SeekTimelineToNativeToolCallRequest, SeekTimelineToStepRequest, TimelineCursorSelector,
1929 TimelineMaterializeMode, TimelineSeekNativeToolCallSelector, TimelineSeekSelector,
1930 TimelineSeekStepSelector, agent_timeline_operation_draft, timeline_seek_selector,
1931 timeline_service_server::TimelineService,
1932 };
1933 use repo::{Repository, operation_dedup::OperationDedupStore};
1934 use tempfile::TempDir;
1935 use tonic::Request;
1936
1937 use super::*;
1938
1939 fn fresh_service() -> (TempDir, LocalTimelineService) {
1940 let temp = TempDir::new().unwrap();
1941 let repo = Repository::init_default(temp.path()).unwrap();
1942 let dedup = OperationDedupStore::open(repo.heddle_dir()).unwrap();
1943 let inner = GrpcLocalService::new(Arc::new(repo), Arc::new(dedup));
1944 (temp, LocalTimelineService::new(inner))
1945 }
1946
1947 async fn record_finished_step(
1948 service: &LocalTimelineService,
1949 step_id: &str,
1950 tool_call_id: &str,
1951 before: u8,
1952 after: u8,
1953 finished_at_ms: i64,
1954 ) {
1955 service
1956 .record_operation(Request::new(RecordTimelineOperationRequest {
1957 repo_path: String::new(),
1958 operation: Some(AgentTimelineOperationDraft {
1959 labels: vec!["repo-reversible".to_string()],
1960 body: Some(agent_timeline_operation_draft::Body::ToolCallFinished(
1961 AgentTimelineToolCallFinished {
1962 thread: "main".to_string(),
1963 step_id: step_id.to_string(),
1964 branch_id: "tlb-main".to_string(),
1965 native: Some(AgentTimelineNativeToolCall {
1966 harness: "opencode".to_string(),
1967 session_id: "session-1".to_string(),
1968 message_id: "message-1".to_string(),
1969 tool_call_id: tool_call_id.to_string(),
1970 }),
1971 status: "succeeded".to_string(),
1972 before_state: vec![before; 16],
1973 after_state: vec![after; 16],
1974 capture_state: vec![after; 16],
1975 capture_oplog_batch_id: Some(finished_at_ms as u64),
1976 changed: true,
1977 touched_paths: vec![format!("src/{step_id}.rs")],
1978 payload: None,
1979 finished_at_ms,
1980 },
1981 )),
1982 }),
1983 client_operation_id: String::new(),
1984 }))
1985 .await
1986 .unwrap();
1987 }
1988
1989 fn write_repo_state(
1990 service: &LocalTimelineService,
1991 root: &std::path::Path,
1992 path: &str,
1993 content: &str,
1994 ) -> ChangeId {
1995 fs::write(root.join(path), content).unwrap();
1996 service
1997 .inner
1998 .repo()
1999 .snapshot(Some(path.to_string()), None)
2000 .unwrap()
2001 .change_id
2002 }
2003
2004 async fn record_finished_step_for_states(
2005 service: &LocalTimelineService,
2006 step_id: &str,
2007 tool_call_id: &str,
2008 before: ChangeId,
2009 after: ChangeId,
2010 touched_path: &str,
2011 finished_at_ms: i64,
2012 ) {
2013 service
2014 .record_operation(Request::new(RecordTimelineOperationRequest {
2015 repo_path: String::new(),
2016 operation: Some(AgentTimelineOperationDraft {
2017 labels: vec!["repo-reversible".to_string()],
2018 body: Some(agent_timeline_operation_draft::Body::ToolCallFinished(
2019 AgentTimelineToolCallFinished {
2020 thread: "main".to_string(),
2021 step_id: step_id.to_string(),
2022 branch_id: "tlb-main".to_string(),
2023 native: Some(AgentTimelineNativeToolCall {
2024 harness: "opencode".to_string(),
2025 session_id: "session-1".to_string(),
2026 message_id: "message-1".to_string(),
2027 tool_call_id: tool_call_id.to_string(),
2028 }),
2029 status: "succeeded".to_string(),
2030 before_state: before.as_bytes().to_vec(),
2031 after_state: after.as_bytes().to_vec(),
2032 capture_state: after.as_bytes().to_vec(),
2033 capture_oplog_batch_id: Some(finished_at_ms as u64),
2034 changed: true,
2035 touched_paths: vec![touched_path.to_string()],
2036 payload: None,
2037 finished_at_ms,
2038 },
2039 )),
2040 }),
2041 client_operation_id: String::new(),
2042 }))
2043 .await
2044 .unwrap();
2045 }
2046
2047 #[tokio::test]
2048 async fn record_operation_stores_canonical_timeline_object() {
2049 let (_temp, service) = fresh_service();
2050 let response = service
2051 .record_operation(Request::new(RecordTimelineOperationRequest {
2052 repo_path: String::new(),
2053 operation: Some(AgentTimelineOperationDraft {
2054 labels: vec!["repo-reversible".to_string()],
2055 body: Some(agent_timeline_operation_draft::Body::ToolCallStarted(
2056 AgentTimelineToolCallStarted {
2057 thread: "main".to_string(),
2058 step_id: "tls-step".to_string(),
2059 branch_id: "tlb-main".to_string(),
2060 parent_step_id: String::new(),
2061 native: Some(AgentTimelineNativeToolCall {
2062 harness: "opencode".to_string(),
2063 session_id: "session-1".to_string(),
2064 message_id: "message-1".to_string(),
2065 tool_call_id: "call-1".to_string(),
2066 }),
2067 tool_name: "shell".to_string(),
2068 before_state: vec![1; 16],
2069 payload: None,
2070 started_at_ms: 1_700_000_000_000,
2071 },
2072 )),
2073 }),
2074 client_operation_id: String::new(),
2075 }))
2076 .await
2077 .unwrap()
2078 .into_inner();
2079
2080 assert_eq!(response.kind, "tool_call_started");
2081 assert_eq!(response.labels, vec!["repo-reversible"]);
2082 assert_eq!(response.schema_version, 1);
2083 assert_eq!(response.operation_id.len(), 32);
2084 assert!(!response.envelope.is_empty());
2085
2086 let read = service
2087 .get_operation(Request::new(GetTimelineOperationRequest {
2088 repo_path: String::new(),
2089 operation_id: response.operation_id.clone(),
2090 }))
2091 .await
2092 .unwrap()
2093 .into_inner();
2094
2095 assert_eq!(read.operation_id, response.operation_id);
2096 assert_eq!(read.envelope, response.envelope);
2097 assert_eq!(read.display_id, response.display_id);
2098 }
2099
2100 #[tokio::test]
2101 async fn list_status_and_seek_follow_recorded_tool_calls() {
2102 let (_temp, service) = fresh_service();
2103 record_finished_step(&service, "tls-step-1", "call-1", 1, 2, 1_700_000_000_000).await;
2104 record_finished_step(&service, "tls-step-2", "call-2", 2, 3, 1_700_000_000_100).await;
2105
2106 let status = service
2107 .get_timeline_status(Request::new(GetTimelineStatusRequest {
2108 repo_path: String::new(),
2109 thread: "main".to_string(),
2110 }))
2111 .await
2112 .unwrap()
2113 .into_inner();
2114 assert_eq!(status.current_branch_id, "tlb-main");
2115 assert_eq!(status.current_step_id, "tls-step-2");
2116 assert_eq!(status.current_state.unwrap().state_id, vec![3; 16]);
2117 assert_eq!(status.step_count, 2);
2118
2119 let listed = service
2120 .list_timeline_steps(Request::new(ListTimelineStepsRequest {
2121 repo_path: String::new(),
2122 thread: "main".to_string(),
2123 branch_id: "tlb-main".to_string(),
2124 limit: 0,
2125 }))
2126 .await
2127 .unwrap()
2128 .into_inner();
2129 assert_eq!(listed.steps.len(), 2);
2130 assert_eq!(listed.steps[0].step_id, "tls-step-1");
2131 assert_eq!(listed.steps[1].step_id, "tls-step-2");
2132 assert_eq!(listed.steps[1].status, "succeeded");
2133 assert_eq!(listed.steps[1].touched_paths, vec!["src/tls-step-2.rs"]);
2134
2135 let resolved = service
2136 .resolve_native_tool_call(Request::new(ResolveNativeToolCallRequest {
2137 repo_path: String::new(),
2138 thread: "main".to_string(),
2139 harness: "opencode".to_string(),
2140 session_id: "session-1".to_string(),
2141 message_id: "message-1".to_string(),
2142 tool_call_id: "call-1".to_string(),
2143 }))
2144 .await
2145 .unwrap()
2146 .into_inner();
2147 assert_eq!(resolved.step_id, "tls-step-1");
2148
2149 let moved = service
2150 .seek_to_step(Request::new(SeekTimelineToStepRequest {
2151 repo_path: String::new(),
2152 thread: "main".to_string(),
2153 branch_id: "tlb-main".to_string(),
2154 step_id: "tls-step-1".to_string(),
2155 reason: "seek-step".to_string(),
2156 client_operation_id: String::new(),
2157 }))
2158 .await
2159 .unwrap()
2160 .into_inner();
2161 assert_eq!(moved.operation.unwrap().kind, "cursor_moved");
2162 let moved_status = moved.status.unwrap();
2163 assert_eq!(moved_status.current_step_id, "tls-step-1");
2164 assert_eq!(moved_status.current_state.unwrap().state_id, vec![2; 16]);
2165
2166 let moved = service
2167 .seek_to_native_tool_call(Request::new(SeekTimelineToNativeToolCallRequest {
2168 repo_path: String::new(),
2169 thread: "main".to_string(),
2170 harness: "opencode".to_string(),
2171 session_id: "session-1".to_string(),
2172 message_id: "message-1".to_string(),
2173 tool_call_id: "call-2".to_string(),
2174 reason: String::new(),
2175 client_operation_id: String::new(),
2176 }))
2177 .await
2178 .unwrap()
2179 .into_inner();
2180 let moved_status = moved.status.unwrap();
2181 assert_eq!(moved_status.current_step_id, "tls-step-2");
2182 assert_eq!(moved_status.current_state.unwrap().state_id, vec![3; 16]);
2183 }
2184
2185 #[tokio::test]
2186 async fn get_timeline_navigation_returns_cursor_actions_and_native_ids() {
2187 let (_temp, service) = fresh_service();
2188 record_finished_step(&service, "tls-step-1", "call-1", 1, 2, 1_700_000_000_000).await;
2189 record_finished_step(&service, "tls-step-2", "call-2", 2, 3, 1_700_000_000_100).await;
2190
2191 let navigation = service
2192 .get_timeline_navigation(Request::new(GetTimelineNavigationRequest {
2193 repo_path: String::new(),
2194 thread: "main".to_string(),
2195 }))
2196 .await
2197 .unwrap()
2198 .into_inner();
2199
2200 assert_eq!(navigation.thread, "main");
2201 assert_eq!(navigation.cursor.unwrap().step_id, "tls-step-2");
2202 assert_eq!(navigation.branches.len(), 1);
2203 assert_eq!(navigation.active_branch_path, vec!["tlb-main"]);
2204 assert!(navigation.actions.as_ref().unwrap().can_undo);
2205 assert!(!navigation.actions.as_ref().unwrap().can_redo);
2206 assert!(navigation.recovery.is_none());
2207
2208 let current = navigation
2209 .steps
2210 .iter()
2211 .find(|step| step.is_current)
2212 .expect("current step");
2213 let summary = current.step.as_ref().unwrap();
2214 assert_eq!(summary.step_id, "tls-step-2");
2215 assert_eq!(summary.native.as_ref().unwrap().harness, "opencode");
2216 assert_eq!(summary.native.as_ref().unwrap().tool_call_id, "call-2");
2217 assert_eq!(current.cursor_state, vec![3; 16]);
2218 }
2219
2220 #[tokio::test]
2221 async fn preview_timeline_seek_resolves_step_and_native_selectors() {
2222 let (temp, service) = fresh_service();
2223 let state0 = service.inner.repo().head().unwrap().unwrap();
2224 let state1 = write_repo_state(&service, temp.path(), "tracked.txt", "one\n");
2225 let state2 = write_repo_state(&service, temp.path(), "tracked.txt", "two\n");
2226 record_finished_step_for_states(
2227 &service,
2228 "tls-step-1",
2229 "call-1",
2230 state0,
2231 state1,
2232 "tracked.txt",
2233 1_700_000_000_000,
2234 )
2235 .await;
2236 record_finished_step_for_states(
2237 &service,
2238 "tls-step-2",
2239 "call-2",
2240 state1,
2241 state2,
2242 "tracked.txt",
2243 1_700_000_000_100,
2244 )
2245 .await;
2246
2247 let by_step = service
2248 .preview_timeline_seek(Request::new(PreviewTimelineSeekRequest {
2249 repo_path: String::new(),
2250 selector: Some(TimelineSeekSelector {
2251 target: Some(timeline_seek_selector::Target::Step(
2252 TimelineSeekStepSelector {
2253 thread: "main".to_string(),
2254 branch_id: "tlb-main".to_string(),
2255 step_id: "tls-step-1".to_string(),
2256 },
2257 )),
2258 }),
2259 mode: TimelineMaterializeMode::FailIfDirty as i32,
2260 }))
2261 .await
2262 .unwrap()
2263 .into_inner();
2264
2265 assert_eq!(
2266 by_step.current_status.unwrap().current_step_id,
2267 "tls-step-2"
2268 );
2269 assert_eq!(by_step.current_state, state2.as_bytes().to_vec());
2270 assert_eq!(by_step.checkout_state, state2.as_bytes().to_vec());
2271 assert_eq!(by_step.target_branch_id, "tlb-main");
2272 assert_eq!(by_step.target_step_id, "tls-step-1");
2273 assert_eq!(by_step.target_state, state1.as_bytes().to_vec());
2274 assert_eq!(by_step.changed_paths, vec!["tracked.txt"]);
2275 assert!(by_step.worktree_dirty_known);
2276 assert!(!by_step.worktree_dirty);
2277 assert!(by_step.materialization_supported);
2278 assert!(by_step.can_materialize);
2279 assert!(by_step.blockers.is_empty());
2280
2281 let by_native = service
2282 .preview_timeline_seek(Request::new(PreviewTimelineSeekRequest {
2283 repo_path: String::new(),
2284 selector: Some(TimelineSeekSelector {
2285 target: Some(timeline_seek_selector::Target::NativeToolCall(
2286 TimelineSeekNativeToolCallSelector {
2287 thread: "main".to_string(),
2288 harness: "opencode".to_string(),
2289 session_id: "session-1".to_string(),
2290 message_id: "message-1".to_string(),
2291 tool_call_id: "call-2".to_string(),
2292 },
2293 )),
2294 }),
2295 mode: TimelineMaterializeMode::FailIfDirty as i32,
2296 }))
2297 .await
2298 .unwrap()
2299 .into_inner();
2300
2301 assert_eq!(by_native.target_step_id, "tls-step-2");
2302 assert_eq!(by_native.target_state, state2.as_bytes().to_vec());
2303
2304 let current_cursor = service
2305 .preview_timeline_seek(Request::new(PreviewTimelineSeekRequest {
2306 repo_path: String::new(),
2307 selector: Some(TimelineSeekSelector {
2308 target: Some(timeline_seek_selector::Target::CurrentCursor(
2309 TimelineCursorSelector {
2310 thread: "main".to_string(),
2311 branch_id: "tlb-main".to_string(),
2312 },
2313 )),
2314 }),
2315 mode: TimelineMaterializeMode::FailIfDirty as i32,
2316 }))
2317 .await
2318 .unwrap()
2319 .into_inner();
2320
2321 assert_eq!(current_cursor.target_step_id, "tls-step-2");
2322 assert_eq!(current_cursor.target_state, state2.as_bytes().to_vec());
2323 assert!(current_cursor.can_materialize);
2324
2325 let unsupported = service
2326 .preview_timeline_seek(Request::new(PreviewTimelineSeekRequest {
2327 repo_path: String::new(),
2328 selector: Some(TimelineSeekSelector {
2329 target: Some(timeline_seek_selector::Target::CurrentCursor(
2330 TimelineCursorSelector {
2331 thread: "main".to_string(),
2332 branch_id: "tlb-main".to_string(),
2333 },
2334 )),
2335 }),
2336 mode: TimelineMaterializeMode::CaptureCurrentThenSeek as i32,
2337 }))
2338 .await
2339 .unwrap()
2340 .into_inner();
2341
2342 assert!(!unsupported.can_materialize);
2343 assert_eq!(unsupported.blockers.len(), 1);
2344 assert_eq!(
2345 unsupported.blockers[0].kind,
2346 TimelineMaterializationBlockerKind::UnsupportedMode as i32
2347 );
2348 assert_eq!(
2349 unsupported.blockers[0].unsupported_mode,
2350 TimelineMaterializeMode::CaptureCurrentThenSeek as i32
2351 );
2352 }
2353
2354 #[tokio::test]
2355 async fn materialize_timeline_cursor_moves_checkout_and_records_cursor() {
2356 let (temp, service) = fresh_service();
2357 let state0 = service.inner.repo().head().unwrap().unwrap();
2358 let state1 = write_repo_state(&service, temp.path(), "tracked.txt", "one\n");
2359 let state2 = write_repo_state(&service, temp.path(), "tracked.txt", "two\n");
2360 record_finished_step_for_states(
2361 &service,
2362 "tls-step-1",
2363 "call-1",
2364 state0,
2365 state1,
2366 "tracked.txt",
2367 1_700_000_000_000,
2368 )
2369 .await;
2370 record_finished_step_for_states(
2371 &service,
2372 "tls-step-2",
2373 "call-2",
2374 state1,
2375 state2,
2376 "tracked.txt",
2377 1_700_000_000_100,
2378 )
2379 .await;
2380
2381 let response = service
2382 .materialize_timeline_cursor(Request::new(MaterializeTimelineCursorRequest {
2383 repo_path: String::new(),
2384 selector: Some(TimelineSeekSelector {
2385 target: Some(timeline_seek_selector::Target::Step(
2386 TimelineSeekStepSelector {
2387 thread: "main".to_string(),
2388 branch_id: "tlb-main".to_string(),
2389 step_id: "tls-step-1".to_string(),
2390 },
2391 )),
2392 }),
2393 mode: TimelineMaterializeMode::FailIfDirty as i32,
2394 client_operation_id: String::new(),
2395 }))
2396 .await
2397 .unwrap()
2398 .into_inner();
2399
2400 assert!(response.materialized);
2401 assert_eq!(
2402 response.status,
2403 WireTimelineMaterializeStatus::Materialized as i32
2404 );
2405 assert_eq!(
2406 response.recovery_status,
2407 WireTimelineMaterializationRecoveryStatus::NoPending as i32
2408 );
2409 assert!(response.recovered_cursor_operation.is_none());
2410 assert!(response.recovery_blockers.is_empty());
2411 assert_eq!(response.cursor_operation.unwrap().kind, "cursor_moved");
2412 assert_eq!(
2413 response.preview.as_ref().unwrap().target_step_id,
2414 "tls-step-1"
2415 );
2416 assert_eq!(
2417 response.updated_status.unwrap().current_step_id,
2418 "tls-step-1",
2419 "materialization should update the logical cursor after moving the checkout"
2420 );
2421 assert!(response.blockers.is_empty());
2422 assert_eq!(service.inner.repo().head().unwrap(), Some(state1));
2423 assert_eq!(
2424 fs::read_to_string(temp.path().join("tracked.txt")).unwrap(),
2425 "one\n"
2426 );
2427 }
2428
2429 #[tokio::test]
2430 async fn timeline_action_rpcs_return_navigation_and_records() {
2431 let (_temp, service) = fresh_service();
2432 record_finished_step(&service, "tls-step-1", "call-1", 1, 2, 1_700_000_000_000).await;
2433 record_finished_step(&service, "tls-step-2", "call-2", 2, 3, 1_700_000_000_100).await;
2434
2435 let fork = service
2436 .fork_timeline_from_selector(Request::new(ForkTimelineFromSelectorRequest {
2437 repo_path: String::new(),
2438 selector: Some(TimelineSeekSelector {
2439 target: Some(timeline_seek_selector::Target::NativeToolCall(
2440 TimelineSeekNativeToolCallSelector {
2441 thread: "main".to_string(),
2442 harness: "opencode".to_string(),
2443 session_id: "session-1".to_string(),
2444 message_id: "message-1".to_string(),
2445 tool_call_id: "call-1".to_string(),
2446 },
2447 )),
2448 }),
2449 branch_id: "tlb-child".to_string(),
2450 reason: "fan-out".to_string(),
2451 client_operation_id: String::new(),
2452 }))
2453 .await
2454 .unwrap()
2455 .into_inner();
2456
2457 assert_eq!(fork.branch_id, "tlb-child");
2458 assert_eq!(fork.parent_branch_id, "tlb-main");
2459 assert_eq!(fork.from_step_id, "tls-step-1");
2460 assert_eq!(fork.operation.unwrap().kind, "branch_created");
2461 assert!(
2462 fork.navigation
2463 .as_ref()
2464 .unwrap()
2465 .branches
2466 .iter()
2467 .any(|branch| branch.branch_id == "tlb-child")
2468 );
2469
2470 let reset = service
2471 .reset_timeline_cursor(Request::new(ResetTimelineCursorRequest {
2472 repo_path: String::new(),
2473 selector: Some(TimelineSeekSelector {
2474 target: Some(timeline_seek_selector::Target::Step(
2475 TimelineSeekStepSelector {
2476 thread: "main".to_string(),
2477 branch_id: "tlb-main".to_string(),
2478 step_id: "tls-step-1".to_string(),
2479 },
2480 )),
2481 }),
2482 mode: TimelineMaterializeMode::FailIfDirty as i32,
2483 materialize_checkout: false,
2484 client_operation_id: String::new(),
2485 }))
2486 .await
2487 .unwrap()
2488 .into_inner();
2489
2490 assert_eq!(reset.cursor_operation.unwrap().kind, "cursor_moved");
2491 assert!(reset.materialization.is_none());
2492 assert_eq!(
2493 reset.navigation.unwrap().cursor.unwrap().step_id,
2494 "tls-step-1"
2495 );
2496
2497 let recovery = service
2498 .recover_timeline_materialization(Request::new(RecoverTimelineMaterializationRequest {
2499 repo_path: String::new(),
2500 thread: "main".to_string(),
2501 client_operation_id: String::new(),
2502 }))
2503 .await
2504 .unwrap()
2505 .into_inner();
2506
2507 assert_eq!(
2508 recovery.recovery_status,
2509 WireTimelineMaterializationRecoveryStatus::NoPending as i32
2510 );
2511 assert!(recovery.recovered_cursor_operation.is_none());
2512 assert!(recovery.recovery_blockers.is_empty());
2513 }
2514}