Skip to main content

daemon/grpc_local_impl/
state_review.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Local-mode implementation of the [`StateReviewService`] gRPC contract.
3//!
4//! Reads / writes the [`ReviewSignaturesBlob`] persisted at
5//! [`State::review_signatures`]. Verifies the client-supplied signature
6//! against the deterministic [`signing_payload`] before persisting.
7
8// `::state_review` disambiguates from this module's own name
9// (`grpc_local_impl::state_review`), the same way the hosted impl
10// disambiguates by being in a sibling module.
11use ::state_review::{
12    PathSymbol, ReadingOrderPartition, SymbolKind, build_review_payload_partition,
13};
14use crypto::verify_payload_signature;
15use grpc::heddle::v1::{
16    AnchoredDiscussion, GetReviewPayloadRequest, ListSignaturesRequest, ListSignaturesResponse,
17    MergeRequirement, PathSymbolRef as ProtoPathSymbolRef,
18    ReadingOrderPartition as ProtoReadingOrderPartition, ReviewPayload,
19    ReviewScope as ProtoReviewScope, ReviewSignature as ProtoReviewSignature, ReviewSummary,
20    RiskSignal as ProtoRiskSignal, SignStateRequest, SignStateResponse,
21    SignalAnchor as ProtoSignalAnchor, SigningFooter,
22    state_review_service_server::StateReviewService,
23};
24use objects::{
25    lock::RepositoryLockExt,
26    object::{
27        Blob, ChangeId, DiffKind, Discussion, DiscussionResolution, DiscussionsBlob, ReviewKind,
28        ReviewScope, ReviewSignature, ReviewSignaturesBlob, RiskSignalBlob, State, SymbolAnchor,
29        signing_payload,
30    },
31    store::ObjectStore,
32    worktree::diff_blobs,
33};
34use prost::Message;
35use repo::Repository;
36use tonic::{Request, Response, Status};
37
38use super::{GrpcLocalService, to_status, with_idempotency};
39
40/// Maximum drift (seconds) between the client's `signed_at_unix` and the
41/// server's wall clock. Generous enough to absorb NTP skew, narrow enough
42/// to bound the window for replay-style attacks.
43const SIGN_TIMESTAMP_SKEW_SECS: i64 = 5 * 60;
44
45/// Local-mode `StateReviewService` implementation.
46#[derive(Clone)]
47pub struct LocalStateReviewService {
48    inner: GrpcLocalService,
49}
50
51impl LocalStateReviewService {
52    pub fn new(inner: GrpcLocalService) -> Self {
53        Self { inner }
54    }
55}
56
57#[tonic::async_trait]
58impl StateReviewService for LocalStateReviewService {
59    async fn get_review_payload(
60        &self,
61        request: Request<GetReviewPayloadRequest>,
62    ) -> Result<Response<ReviewPayload>, Status> {
63        let req = request.into_inner();
64        let change_id = parse_change_id(&req.state_id)?;
65        let repo = self.inner.repo();
66        let state = repo
67            .store()
68            .get_state(&change_id)
69            .map_err(to_status)?
70            .ok_or_else(|| {
71                Status::not_found(format!("state {} not found", change_id.to_string_full()))
72            })?;
73
74        // Diff the state's tree against its first parent so the summary
75        // counts reflect what actually changed in this state. The
76        // signal registry / budgeter will eventually layer on top of
77        // this; until then `files_changed` is the most useful single
78        // number an agent can use for self-review.
79        let diff_summary = compute_state_diff_summary(repo, &state).map_err(to_status)?;
80
81        let summary = ReviewSummary {
82            headline: state.intent.clone().unwrap_or_default(),
83            files_changed: diff_summary.files_changed,
84            added_lines: diff_summary.added_lines,
85            removed_lines: diff_summary.removed_lines,
86            in_budget_signal_count: 0,
87            hidden_signal_count: 0,
88        };
89
90        let agent_narrative = if state.attribution.agent.is_some() {
91            state.intent.clone().unwrap_or_default()
92        } else {
93            String::new()
94        };
95
96        // Surface fired risk signals if requested. The signal registry will
97        // replace this with a proper budget split; until then everything we
98        // read is "visible".
99        let mut all_signals: Vec<ProtoRiskSignal> = Vec::new();
100        if req.include_all_signals
101            && let Some(hash) = state.risk_signals
102            && let Some(blob) = repo.store().get_blob(&hash).map_err(to_status)?
103        {
104            let decoded = RiskSignalBlob::decode(blob.content())
105                .map_err(|err| Status::internal(format!("decode risk signals: {err}")))?;
106            all_signals = decoded
107                .signals
108                .into_iter()
109                .map(|s| risk_signal_to_proto(s, "visible"))
110                .collect();
111        }
112
113        // Synthesize a structured `diff_summary` signal so the
114        // `in_budget_signals` array is non-empty even before the real
115        // signal registry is wired up. Anchored on each modified
116        // file (capped) so consumers can iterate without losing the
117        // summary aggregate. This is a deliberate stable shape: agents
118        // already iterating signals get a usable record per file
119        // change, and the registry-driven path will simply layer real
120        // signals alongside it.
121        let mut in_budget_signals: Vec<ProtoRiskSignal> = Vec::new();
122        let summary_kind = match (
123            diff_summary.added_files,
124            diff_summary.modified_files,
125            diff_summary.deleted_files,
126        ) {
127            (a, 0, 0) if a > 0 => "diff_summary.added_only",
128            (0, m, 0) if m > 0 => "diff_summary.modified_only",
129            (0, 0, d) if d > 0 => "diff_summary.deleted_only",
130            (0, 0, 0) => "diff_summary.empty",
131            _ => "diff_summary.mixed",
132        };
133        let summary_reason = format!(
134            "{} files changed (+{}/-{}, {} added, {} modified, {} deleted)",
135            diff_summary.files_changed,
136            diff_summary.added_lines,
137            diff_summary.removed_lines,
138            diff_summary.added_files,
139            diff_summary.modified_files,
140            diff_summary.deleted_files,
141        );
142        // Per-file anchors keep the array reasoning-friendly when
143        // many files change, but cap so very large diffs don't bloat
144        // the payload. The aggregate summary always rides on the
145        // first entry's reason field; the rest carry per-file deltas.
146        const MAX_DIFF_SIGNAL_ANCHORS: usize = 32;
147        if diff_summary.changed_paths.is_empty() {
148            in_budget_signals.push(ProtoRiskSignal {
149                kind: summary_kind.to_string(),
150                anchor: Some(ProtoSignalAnchor {
151                    file: String::new(),
152                    symbol: String::new(),
153                    start_line: 0,
154                    end_line: 0,
155                }),
156                reason: summary_reason.clone(),
157                producer_module: "review_show.diff_summary".to_string(),
158                producer_version: 1,
159                computed_at: None,
160                visibility: "visible".to_string(),
161            });
162        } else {
163            for (idx, path_kind) in diff_summary
164                .changed_paths
165                .iter()
166                .take(MAX_DIFF_SIGNAL_ANCHORS)
167                .enumerate()
168            {
169                let reason = if idx == 0 {
170                    summary_reason.clone()
171                } else {
172                    format!("{} ({})", path_kind.path, path_kind.kind_str())
173                };
174                in_budget_signals.push(ProtoRiskSignal {
175                    kind: summary_kind.to_string(),
176                    anchor: Some(ProtoSignalAnchor {
177                        file: path_kind.path.clone(),
178                        symbol: String::new(),
179                        start_line: 0,
180                        end_line: 0,
181                    }),
182                    reason,
183                    producer_module: "review_show.diff_summary".to_string(),
184                    producer_version: 1,
185                    computed_at: None,
186                    visibility: "visible".to_string(),
187                });
188            }
189        }
190
191        // Server-side reading-order partition. Same per-symbol
192        // extraction logic as the hosted handler: tree-sitter when the
193        // `semantic` feature is enabled, path-only fallback otherwise.
194        let symbols = changed_files_as_symbols(repo, &state).map_err(to_status)?;
195        let partition = build_review_payload_partition(&symbols);
196
197        // Project the state's `DiscussionsBlob` (when present)
198        // into the wire-shape `AnchoredDiscussion` list.
199        let discussions = match state.discussions {
200            Some(hash) => {
201                let blob = repo
202                    .store()
203                    .get_blob(&hash)
204                    .map_err(to_status)?
205                    .ok_or_else(|| {
206                        Status::internal(format!(
207                            "discussions blob {} referenced by state {} is missing",
208                            hash,
209                            state.change_id.to_string_full()
210                        ))
211                    })?;
212                let decoded = DiscussionsBlob::decode(blob.content())
213                    .map_err(|err| Status::internal(format!("decode discussions: {err}")))?;
214                decoded
215                    .discussions
216                    .iter()
217                    .map(discussion_to_anchored_proto)
218                    .collect()
219            }
220            None => Vec::<AnchoredDiscussion>::new(),
221        };
222
223        let mut summary = summary;
224        summary.in_budget_signal_count = in_budget_signals.len() as u32;
225        summary.hidden_signal_count =
226            all_signals.len().saturating_sub(in_budget_signals.len()) as u32;
227
228        let payload = ReviewPayload {
229            state_id: req.state_id.clone(),
230            summary: Some(summary),
231            agent_narrative,
232            partition: Some(partition_to_proto(partition)),
233            in_budget_signals,
234            all_signals,
235            tick_budget: 3,
236            discussions,
237            // Local mode has no policy registry — merge requirements
238            // are surfaced only by the hosted handler.
239            merge_requirements: Vec::<MergeRequirement>::new(),
240            signing_footer: Some(SigningFooter {
241                available_kinds: vec![
242                    grpc::heddle::v1::ReviewKind::Read as i32,
243                    grpc::heddle::v1::ReviewKind::AgentPreview as i32,
244                    grpc::heddle::v1::ReviewKind::AgentCoReview as i32,
245                ],
246            }),
247        };
248
249        Ok(Response::new(payload))
250    }
251
252    async fn sign_state(
253        &self,
254        request: Request<SignStateRequest>,
255    ) -> Result<Response<SignStateResponse>, Status> {
256        let req = request.into_inner();
257        let req_bytes = req.encode_to_vec();
258        let client_operation_id = req.client_operation_id.clone();
259        let inner = self.inner.clone();
260
261        let response = with_idempotency(
262            &self.inner,
263            &client_operation_id,
264            "state_review.sign_state",
265            &req_bytes,
266            move || {
267                let inner = inner.clone();
268                async move { execute_sign_state(&inner, req).await }
269            },
270        )
271        .await?;
272
273        Ok(Response::new(response))
274    }
275
276    async fn list_signatures(
277        &self,
278        request: Request<ListSignaturesRequest>,
279    ) -> Result<Response<ListSignaturesResponse>, Status> {
280        let req = request.into_inner();
281        let change_id = parse_change_id(&req.state_id)?;
282        let repo = self.inner.repo();
283        let state = repo
284            .store()
285            .get_state(&change_id)
286            .map_err(to_status)?
287            .ok_or_else(|| {
288                Status::not_found(format!("state {} not found", change_id.to_string_full()))
289            })?;
290
291        let signatures = match state.review_signatures {
292            Some(hash) => {
293                let blob = repo
294                    .store()
295                    .get_blob(&hash)
296                    .map_err(to_status)?
297                    .ok_or_else(|| {
298                        Status::internal(format!(
299                            "review signatures blob {} missing from object store",
300                            hash
301                        ))
302                    })?;
303                let decoded = ReviewSignaturesBlob::decode(blob.content())
304                    .map_err(|err| Status::internal(format!("decode review signatures: {err}")))?;
305                decoded
306                    .signatures
307                    .into_iter()
308                    .enumerate()
309                    .map(|(idx, sig)| review_signature_to_proto(sig, synthetic_signature_id(idx)))
310                    .collect()
311            }
312            None => Vec::new(),
313        };
314
315        Ok(Response::new(ListSignaturesResponse { signatures }))
316    }
317}
318
319/// Body of [`LocalStateReviewService::sign_state`]. Lifted out of the trait
320/// method so [`with_idempotency`] can re-execute it inside its closure.
321async fn execute_sign_state(
322    inner: &GrpcLocalService,
323    req: SignStateRequest,
324) -> Result<SignStateResponse, Status> {
325    // 1. Validate the kind.
326    let kind = match grpc::heddle::v1::ReviewKind::try_from(req.kind)
327        .map_err(|_| Status::invalid_argument(format!("unknown review kind tag {}", req.kind)))?
328    {
329        grpc::heddle::v1::ReviewKind::Read => ReviewKind::Read,
330        grpc::heddle::v1::ReviewKind::AgentPreview => ReviewKind::AgentPreview,
331        grpc::heddle::v1::ReviewKind::AgentCoReview => ReviewKind::AgentCoReview,
332        grpc::heddle::v1::ReviewKind::Unspecified => {
333            return Err(Status::invalid_argument("review kind is required"));
334        }
335    };
336
337    // 2. Locate the state.
338    let change_id = parse_change_id(&req.state_id)?;
339    let repo = inner.repo();
340
341    // 3. Translate the scope.
342    let scope = match req.scope.as_ref() {
343        Some(s) => proto_scope_to_object(s)?,
344        None => ReviewScope::WholeChange,
345    };
346
347    // 4. Build the ReviewSignature, then verify the client-supplied
348    // signature is well-formed and matches the deterministic signing
349    // payload. A malformed or forged signature must never reach the
350    // persisted blob. Attribute the signature to the local-mode
351    // caller (`Repository::get_principal` resolves env vars then
352    // `[principal]` in `.heddle/config.toml`), not the state's author
353    // — Bob signing Alice's state should record Bob.
354    let actor = repo
355        .get_principal()
356        .map_err(|err| Status::internal(format!("resolve caller principal: {err}")))?;
357    let justification = if req.justification.is_empty() {
358        None
359    } else {
360        Some(req.justification.clone())
361    };
362
363    let now = chrono::Utc::now().timestamp();
364    let signed_at = req.signed_at.as_ref().map(|t| t.seconds).unwrap_or(0);
365    if signed_at == 0 {
366        return Err(Status::invalid_argument(
367            "signed_at is required and must match the timestamp the client signed over",
368        ));
369    }
370    if (signed_at - now).abs() > SIGN_TIMESTAMP_SKEW_SECS {
371        return Err(Status::invalid_argument(format!(
372            "signed_at={signed_at} is too far from server time={now} (max skew {SIGN_TIMESTAMP_SKEW_SECS}s)"
373        )));
374    }
375
376    let new_sig = ReviewSignature {
377        actor,
378        kind,
379        scope: scope.clone(),
380        justification: justification.clone(),
381        signed_at,
382        algorithm: req.algorithm.clone(),
383        public_key: hex::encode(&req.public_key),
384        signature: hex::encode(&req.signature),
385    };
386    new_sig
387        .validate()
388        .map_err(|err| Status::invalid_argument(format!("invalid review signature: {err}")))?;
389
390    let public_key_bytes = req.public_key.clone();
391    let signature_bytes = req.signature.clone();
392    let payload = signing_payload(change_id, kind, &scope, signed_at, justification.as_deref());
393    verify_payload_signature(
394        &payload,
395        &req.algorithm,
396        &public_key_bytes,
397        &signature_bytes,
398    )
399    .map_err(|err| {
400        Status::invalid_argument(format!(
401            "review signature failed verification ({}): {err}",
402            req.algorithm
403        ))
404    })?;
405
406    // 5. Serialize the read-modify-write on `review_signatures`
407    // behind the repo write-lock and re-load the state inside the
408    // critical section. Two concurrent SignStates with different
409    // operation ids would otherwise both read the same base blob and
410    // the second `put_state` would clobber the first signature.
411    let _lock = repo
412        .locker()
413        .write()
414        .map_err(|err| Status::internal(err.to_string()))?;
415    let state = repo
416        .store()
417        .get_state(&change_id)
418        .map_err(to_status)?
419        .ok_or_else(|| {
420            Status::not_found(format!("state {} not found", change_id.to_string_full()))
421        })?;
422    let mut blob = match state.review_signatures {
423        Some(hash) => {
424            let raw = repo
425                .store()
426                .get_blob(&hash)
427                .map_err(to_status)?
428                .ok_or_else(|| {
429                    Status::internal(format!(
430                        "existing review signatures blob {} missing from object store",
431                        hash
432                    ))
433                })?;
434            ReviewSignaturesBlob::decode(raw.content())
435                .map_err(|err| Status::internal(format!("decode review signatures: {err}")))?
436        }
437        None => ReviewSignaturesBlob::new(Vec::new()),
438    };
439    blob.signatures.push(new_sig);
440    let new_index = blob.signatures.len() - 1;
441
442    // 6. Persist the new blob.
443    let bytes = blob
444        .encode()
445        .map_err(|err| Status::internal(format!("encode review signatures: {err}")))?;
446    let content_hash = repo
447        .store()
448        .put_blob(&Blob::new(bytes))
449        .map_err(to_status)?;
450
451    // 7. Persist the updated state.
452    let new_state = state.with_review_signatures(content_hash);
453    repo.store().put_state(&new_state).map_err(to_status)?;
454
455    Ok(SignStateResponse {
456        signature_id: synthetic_signature_id(new_index),
457        state_id: req.state_id,
458    })
459}
460
461/// `ReviewSignature` doesn't carry an explicit id; we synthesise one from
462/// the per-state index so the wire surface has stable signature ids within a
463/// single state. (A future schema bump may add an explicit id.)
464fn synthetic_signature_id(index: usize) -> String {
465    format!("rs-{index}")
466}
467
468fn parse_change_id(s: &[u8]) -> Result<ChangeId, Status> {
469    ChangeId::try_from_slice(s)
470        .map_err(|err| Status::invalid_argument(format!("invalid state_id: {err}")))
471}
472
473fn proto_scope_to_object(scope: &ProtoReviewScope) -> Result<ReviewScope, Status> {
474    use grpc::heddle::v1::review_scope::Scope;
475    match scope.scope.as_ref() {
476        None | Some(Scope::WholeChange(_)) => Ok(ReviewScope::WholeChange),
477        Some(Scope::Symbols(list)) => {
478            if list.symbols.is_empty() {
479                return Err(Status::invalid_argument(
480                    "symbols scope requires at least one symbol anchor",
481                ));
482            }
483            let symbols = list
484                .symbols
485                .iter()
486                .map(|s| SymbolAnchor::new(s.file.clone(), s.symbol.clone()))
487                .collect();
488            Ok(ReviewScope::Symbols(symbols))
489        }
490    }
491}
492
493fn object_scope_to_proto(scope: &ReviewScope) -> ProtoReviewScope {
494    use grpc::heddle::v1::review_scope::{Scope, SymbolList, WholeChange};
495    let inner = match scope {
496        ReviewScope::WholeChange => Scope::WholeChange(WholeChange {}),
497        ReviewScope::Symbols(symbols) => Scope::Symbols(SymbolList {
498            symbols: symbols
499                .iter()
500                .map(|s| ProtoPathSymbolRef {
501                    file: s.file.clone(),
502                    symbol: s.symbol.clone(),
503                })
504                .collect(),
505        }),
506    };
507    ProtoReviewScope { scope: Some(inner) }
508}
509
510fn review_signature_to_proto(sig: ReviewSignature, signature_id: String) -> ProtoReviewSignature {
511    ProtoReviewSignature {
512        signature_id,
513        actor_name: sig.actor.name.clone(),
514        actor_email: sig.actor.email.clone(),
515        kind: review_kind_to_proto(sig.kind) as i32,
516        scope: Some(object_scope_to_proto(&sig.scope)),
517        justification: sig.justification.unwrap_or_default(),
518        signed_at: Some(prost_types::Timestamp {
519            seconds: sig.signed_at,
520            nanos: 0,
521        }),
522        algorithm: sig.algorithm,
523        public_key: hex::decode(&sig.public_key).unwrap_or_default(),
524        signature: hex::decode(&sig.signature).unwrap_or_default(),
525    }
526}
527
528fn review_kind_to_proto(kind: ReviewKind) -> grpc::heddle::v1::ReviewKind {
529    match kind {
530        ReviewKind::Read => grpc::heddle::v1::ReviewKind::Read,
531        ReviewKind::AgentPreview => grpc::heddle::v1::ReviewKind::AgentPreview,
532        ReviewKind::AgentCoReview => grpc::heddle::v1::ReviewKind::AgentCoReview,
533    }
534}
535
536fn risk_signal_to_proto(sig: objects::object::RiskSignal, visibility: &str) -> ProtoRiskSignal {
537    let (start_line, end_line) = sig.anchor.line_range.unwrap_or((0, 0));
538    ProtoRiskSignal {
539        kind: sig.kind.as_str().to_string(),
540        anchor: Some(ProtoSignalAnchor {
541            file: sig.anchor.file,
542            symbol: sig.anchor.symbol.unwrap_or_default(),
543            start_line,
544            end_line,
545        }),
546        reason: sig.reason,
547        producer_module: sig.producer.module,
548        producer_version: sig.producer.version,
549        computed_at: Some(prost_types::Timestamp {
550            seconds: sig.computed_at,
551            nanos: 0,
552        }),
553        visibility: visibility.to_string(),
554    }
555}
556
557// ---------------------------------------------------------------------------
558// Symbol extraction + discussion projection (mirrors hosted impl).
559// ---------------------------------------------------------------------------
560
561/// Symbol projection for the reading-order partition. Mirrors the hosted-handler
562/// implementation: when the `semantic` feature is enabled and the
563/// changed file has a tree-sitter parser, emits one [`PathSymbol`] per
564/// definition. Otherwise falls back to a single path-only entry per
565/// changed file (kind = `Other`), routed by the path classifier.
566fn changed_files_as_symbols(
567    repo: &Repository,
568    state: &State,
569) -> objects::error::Result<Vec<PathSymbol>> {
570    let new_tree = match repo.store().get_tree(&state.tree)? {
571        Some(t) => t,
572        None => return Ok(Vec::new()),
573    };
574    let parent_tree = if let Some(parent_id) = state.parents.first() {
575        repo.store()
576            .get_state(parent_id)?
577            .and_then(|p| repo.store().get_tree(&p.tree).ok().flatten())
578    } else {
579        None
580    };
581    let new_files = collect_files(repo, &new_tree, "")?;
582    let parent_files = match parent_tree {
583        Some(t) => collect_files(repo, &t, "")?,
584        None => std::collections::HashMap::new(),
585    };
586
587    let mut out: Vec<PathSymbol> = Vec::new();
588    for (path, hash) in &new_files {
589        let changed = parent_files.get(path).map(|h| h != hash).unwrap_or(true);
590        if !changed {
591            continue;
592        }
593        #[cfg_attr(not(feature = "semantic"), allow(unused_mut))]
594        let mut emitted_any = false;
595        #[cfg(feature = "semantic")]
596        {
597            if let Some(blob) = repo.store().get_blob(hash)? {
598                emitted_any = extract_file_symbols(path, blob.content(), &mut out);
599            }
600        }
601        let _ = hash;
602        if !emitted_any {
603            out.push(PathSymbol {
604                file: path.clone(),
605                symbol: path.clone(),
606                kind: SymbolKind::Other,
607            });
608        }
609    }
610    Ok(out)
611}
612
613#[cfg(feature = "semantic")]
614fn extract_file_symbols(path: &str, source: &[u8], out: &mut Vec<PathSymbol>) -> bool {
615    use ::semantic::symbol_resolver::{Definition, DefinitionKind, extract_definitions};
616    let definitions: Vec<Definition> = match extract_definitions(source, std::path::Path::new(path))
617    {
618        Ok(defs) => defs,
619        Err(_) => return false,
620    };
621    if definitions.is_empty() {
622        return false;
623    }
624    for d in definitions {
625        let kind = match d.kind {
626            DefinitionKind::Type => SymbolKind::Type,
627            DefinitionKind::Trait => SymbolKind::Trait,
628            DefinitionKind::Class => SymbolKind::Class,
629            DefinitionKind::Interface => SymbolKind::Interface,
630            DefinitionKind::TypeAlias => SymbolKind::TypeAlias,
631            DefinitionKind::EnumDef => SymbolKind::EnumDef,
632            DefinitionKind::ConstDecl => SymbolKind::ConstDecl,
633            DefinitionKind::Module => SymbolKind::Module,
634            DefinitionKind::Function => SymbolKind::Function,
635            DefinitionKind::Other => SymbolKind::Other,
636        };
637        let symbol = match d.parent_name.as_deref() {
638            Some(parent) if !parent.is_empty() => format!("{parent}::{}", d.name),
639            _ => d.name,
640        };
641        out.push(PathSymbol {
642            file: path.to_string(),
643            symbol,
644            kind,
645        });
646    }
647    true
648}
649
650fn collect_files(
651    repo: &Repository,
652    tree: &objects::object::Tree,
653    prefix: &str,
654) -> objects::error::Result<std::collections::HashMap<String, objects::object::ContentHash>> {
655    let mut out = std::collections::HashMap::new();
656    for entry in tree.entries() {
657        let path = if prefix.is_empty() {
658            entry.name.clone()
659        } else {
660            format!("{prefix}/{}", entry.name)
661        };
662        if entry.is_tree() {
663            if let Some(subtree) = repo.store().get_tree(&entry.hash)? {
664                let sub = collect_files(repo, &subtree, &path)?;
665                out.extend(sub);
666            }
667        } else {
668            out.insert(path, entry.hash);
669        }
670    }
671    Ok(out)
672}
673
674fn partition_to_proto(p: ReadingOrderPartition) -> ProtoReadingOrderPartition {
675    ProtoReadingOrderPartition {
676        structural: p.structural.iter().map(path_symbol_to_proto).collect(),
677        consequence: p.consequence.iter().map(path_symbol_to_proto).collect(),
678        tests_and_docs: p.tests_and_docs.iter().map(path_symbol_to_proto).collect(),
679    }
680}
681
682fn path_symbol_to_proto(p: &PathSymbol) -> ProtoPathSymbolRef {
683    ProtoPathSymbolRef {
684        file: p.file.clone(),
685        symbol: p.symbol.clone(),
686    }
687}
688
689fn discussion_to_anchored_proto(d: &Discussion) -> AnchoredDiscussion {
690    AnchoredDiscussion {
691        id: d.id.clone(),
692        anchor: Some(ProtoPathSymbolRef {
693            file: d.anchor.file.clone(),
694            symbol: d.anchor.symbol.clone(),
695        }),
696        opened_against_state: d.opened_against_state.as_bytes().to_vec(),
697        opened_at: Some(prost_types::Timestamp {
698            seconds: d.opened_at,
699            nanos: 0,
700        }),
701        turns: d
702            .turns
703            .iter()
704            .map(|t| grpc::heddle::v1::DiscussionTurn {
705                author_name: t.author.name.clone(),
706                author_email: t.author.email.clone(),
707                body: t.body.clone(),
708                posted_at: Some(prost_types::Timestamp {
709                    seconds: t.posted_at,
710                    nanos: 0,
711                }),
712            })
713            .collect(),
714        resolution: Some(discussion_resolution_to_proto(&d.resolution)),
715        body_changed_since_open: d.body_changed_since_open,
716        orphaned: d.orphaned,
717        visibility: d.visibility.as_str().to_string(),
718    }
719}
720
721fn discussion_resolution_to_proto(
722    resolution: &DiscussionResolution,
723) -> grpc::heddle::v1::DiscussionResolution {
724    use grpc::heddle::v1::discussion_resolution::{
725        Dismissed, Open, ResolvedByEdit, ResolvedIntoAnnotation, State,
726    };
727    let state = match resolution {
728        DiscussionResolution::Open => State::Open(Open {}),
729        DiscussionResolution::ResolvedIntoAnnotation { annotation_id } => {
730            State::IntoAnnotation(ResolvedIntoAnnotation {
731                annotation_id: annotation_id.clone(),
732            })
733        }
734        DiscussionResolution::ResolvedByEdit { state_id } => State::ByEdit(ResolvedByEdit {
735            state_id: state_id.as_bytes().to_vec(),
736        }),
737        DiscussionResolution::Dismissed { reason } => State::Dismissed(Dismissed {
738            reason: reason.clone(),
739        }),
740    };
741    grpc::heddle::v1::DiscussionResolution { state: Some(state) }
742}
743
744// ---------------------------------------------------------------------------
745// Diff summary helpers (state.tree vs first parent's tree).
746// ---------------------------------------------------------------------------
747
748/// File-change kinds we surface in the diff summary signal anchors.
749/// Mirrors `objects::object::DiffKind` minus the `Unchanged` variant
750/// (we filter those out before constructing this).
751#[derive(Debug, Clone)]
752struct ChangedPath {
753    path: String,
754    kind: DiffKind,
755}
756
757impl ChangedPath {
758    fn kind_str(&self) -> &'static str {
759        match self.kind {
760            DiffKind::Added => "added",
761            DiffKind::Modified => "modified",
762            DiffKind::Deleted => "deleted",
763            DiffKind::Unchanged => "unchanged",
764        }
765    }
766}
767
768/// Aggregated counts plus a path list, computed by diffing
769/// `state.tree` against the first parent's tree (or empty when the
770/// state is a root). When `state.parents` is empty every file in the
771/// state's tree counts as added, which makes "first capture" reviews
772/// non-empty too. The `_state` prefix on `_state` is intentional: the
773/// helper currently only reads `state.tree` and `state.parents`.
774struct DiffSummary {
775    files_changed: u32,
776    added_files: u32,
777    modified_files: u32,
778    deleted_files: u32,
779    added_lines: u32,
780    removed_lines: u32,
781    changed_paths: Vec<ChangedPath>,
782}
783
784/// Compute a summary diff for `state` vs its first parent. Errors
785/// from the object store propagate; missing trees / blobs are skipped
786/// silently (treated as zero-change for that path) so a partially
787/// pruned object store never blocks the review surface. The
788/// distinction matters: missing-object errors must become zero (the
789/// summary is best-effort, callers want a payload they can render),
790/// but genuine I/O errors must still propagate so a corrupt store
791/// surfaces loudly instead of silently truncating the review.
792fn compute_state_diff_summary(
793    repo: &Repository,
794    state: &State,
795) -> objects::error::Result<DiffSummary> {
796    use objects::object::Tree;
797    let parent_tree_hash = if let Some(parent_id) = state.parents.first() {
798        match repo.store().get_state(parent_id)? {
799            Some(parent_state) => parent_state.tree,
800            None => Tree::new().hash(),
801        }
802    } else {
803        Tree::new().hash()
804    };
805
806    // Resolve both tree objects up front so the missing-tree case
807    // becomes a synthesized empty changeset rather than an error from
808    // the recursive diff. `get_tree` returns `Ok(None)` for missing
809    // (not an error), and propagates only on genuine I/O — matching
810    // the policy the doc-comment claims.
811    let parent_tree_obj = repo.store().get_tree(&parent_tree_hash)?;
812    let new_tree_obj = repo.store().get_tree(&state.tree)?;
813
814    // If either tree is missing from the local store the diff is not
815    // meaningful — return an empty summary instead of erroring out.
816    // This mirrors the "Modified branch tolerates missing blobs" stance
817    // for the *tree* level: a partially pruned store should never block
818    // review payload retrieval, only render an empty summary.
819    let changes = if parent_tree_obj.is_some() && new_tree_obj.is_some() {
820        repo.diff_trees(&parent_tree_hash, &state.tree)?
821    } else {
822        objects::object::FileChangeSet::new()
823    };
824
825    // Compute per-file line deltas. We only count `Modified` here for
826    // the symmetric add/remove totals; `Added` files contribute every
827    // line as an add, and `Deleted` files contribute every line as a
828    // remove. Files with non-utf8 contents (e.g. binaries) silently
829    // contribute zero — `diff_blobs` already returns an empty vec in
830    // that case, and we mirror the same behavior for raw line counts.
831    let mut added_lines: u32 = 0;
832    let mut removed_lines: u32 = 0;
833    let mut changed_paths: Vec<ChangedPath> = Vec::with_capacity(changes.len());
834
835    let parent_files = match parent_tree_obj.as_ref() {
836        Some(t) => collect_files(repo, t, "")?,
837        None => std::collections::HashMap::new(),
838    };
839    let new_files = match new_tree_obj.as_ref() {
840        Some(t) => collect_files(repo, t, "")?,
841        None => std::collections::HashMap::new(),
842    };
843
844    let mut added_files: u32 = 0;
845    let mut modified_files: u32 = 0;
846    let mut deleted_files: u32 = 0;
847
848    for change in changes.iter() {
849        match change.kind {
850            DiffKind::Added => {
851                added_files += 1;
852                // Missing blob (`get_blob` returns `Ok(None)`) → file
853                // counts but contributes zero lines. Genuine I/O
854                // errors still propagate via `?` — same shape as the
855                // Modified branch's intent, but here we keep the
856                // distinction explicit so a corrupt store surfaces
857                // rather than getting silently swallowed.
858                if let Some(hash) = new_files.get(&change.path)
859                    && let Some(blob) = repo.store().get_blob(hash)?
860                {
861                    added_lines = added_lines.saturating_add(line_count(blob.content()));
862                }
863            }
864            DiffKind::Deleted => {
865                deleted_files += 1;
866                if let Some(hash) = parent_files.get(&change.path)
867                    && let Some(blob) = repo.store().get_blob(hash)?
868                {
869                    removed_lines = removed_lines.saturating_add(line_count(blob.content()));
870                }
871            }
872            DiffKind::Modified => {
873                modified_files += 1;
874                // `get_blob` already returns `Ok(None)` for a missing
875                // blob, so `?` here only fires on genuine I/O. Match
876                // the Added/Deleted branches' propagation policy
877                // explicitly instead of the older `.ok().flatten()`
878                // form, which silently swallowed IO errors and
879                // conflated them with "missing".
880                let old_blob = match parent_files.get(&change.path) {
881                    Some(h) => repo.store().get_blob(h)?,
882                    None => None,
883                };
884                let new_blob = match new_files.get(&change.path) {
885                    Some(h) => repo.store().get_blob(h)?,
886                    None => None,
887                };
888                if let (Some(old), Some(new)) = (old_blob, new_blob) {
889                    for line in diff_blobs(&old, &new) {
890                        match line {
891                            objects::worktree::DiffLine::Added(_) => {
892                                added_lines = added_lines.saturating_add(1);
893                            }
894                            objects::worktree::DiffLine::Removed(_) => {
895                                removed_lines = removed_lines.saturating_add(1);
896                            }
897                            objects::worktree::DiffLine::Context(_) => {}
898                        }
899                    }
900                }
901            }
902            DiffKind::Unchanged => continue,
903        }
904        changed_paths.push(ChangedPath {
905            path: change.path.clone(),
906            kind: change.kind,
907        });
908    }
909
910    Ok(DiffSummary {
911        files_changed: changed_paths.len() as u32,
912        added_files,
913        modified_files,
914        deleted_files,
915        added_lines,
916        removed_lines,
917        changed_paths,
918    })
919}
920
921/// Count the number of newline-separated lines in a file blob. Binary
922/// blobs (non-utf8) count as zero — we deliberately don't byte-count
923/// them, since "lines" is meaningless for binary content. A trailing
924/// newline does not introduce a phantom empty line.
925fn line_count(content: &[u8]) -> u32 {
926    let Ok(s) = std::str::from_utf8(content) else {
927        return 0;
928    };
929    if s.is_empty() {
930        return 0;
931    }
932    let trimmed = s.strip_suffix('\n').unwrap_or(s);
933    if trimmed.is_empty() {
934        return 1;
935    }
936    (trimmed.matches('\n').count() as u32).saturating_add(1)
937}
938
939// ---------------------------------------------------------------------------
940// Tests
941// ---------------------------------------------------------------------------
942
943#[cfg(test)]
944mod tests {
945    use std::sync::Arc;
946
947    use crypto::Signer as _;
948    use grpc::heddle::v1::ReviewScope as ProtoReviewScope;
949    use repo::{Repository, operation_dedup::OperationDedupStore};
950    use tempfile::TempDir;
951
952    use super::*;
953
954    fn fresh_service() -> (LocalStateReviewService, Arc<Repository>, TempDir) {
955        let temp = TempDir::new().expect("create tempdir");
956        // SAFETY: tests run with a controlled environment; setting these
957        // env vars steers the default attribution into a predictable place.
958        // SAFETY: setting env vars in tests; rust 2024 marks these unsafe.
959        unsafe {
960            std::env::set_var("HEDDLE_PRINCIPAL_NAME", "Alice Tester");
961            std::env::set_var("HEDDLE_PRINCIPAL_EMAIL", "alice@example.com");
962        }
963        let repo = Repository::init_default(temp.path()).expect("init repo");
964        let dedup = OperationDedupStore::open(repo.heddle_dir()).expect("open dedup");
965        let repo = Arc::new(repo);
966        let svc =
967            LocalStateReviewService::new(GrpcLocalService::new(repo.clone(), Arc::new(dedup)));
968        (svc, repo, temp)
969    }
970
971    fn capture_state(repo: &Repository) -> ChangeId {
972        // Write a tiny file so snapshot has something to capture.
973        std::fs::write(repo.root().join("hello.txt"), b"hi").expect("write file");
974        let state = repo
975            .snapshot(Some("seed".to_string()), None)
976            .expect("snapshot");
977        state.change_id
978    }
979
980    fn sign_request(state_id: &ChangeId, op_id: &str) -> SignStateRequest {
981        let signer = crypto::Ed25519Signer::generate().expect("generate ed25519 key");
982        let scope = ReviewScope::WholeChange;
983        let signed_at = chrono::Utc::now().timestamp();
984        let payload = signing_payload(*state_id, ReviewKind::Read, &scope, signed_at, None);
985        let signature = signer.sign(&payload).expect("sign payload");
986        use grpc::heddle::v1::review_scope::{Scope, WholeChange};
987        SignStateRequest {
988            repo_path: String::new(),
989            state_id: state_id.as_bytes().to_vec(),
990            kind: grpc::heddle::v1::ReviewKind::Read as i32,
991            scope: Some(ProtoReviewScope {
992                scope: Some(Scope::WholeChange(WholeChange {})),
993            }),
994            justification: String::new(),
995            algorithm: "ed25519".to_string(),
996            public_key: signer.public_key().to_vec(),
997            signature: signature.clone(),
998            signed_at: Some(prost_types::Timestamp {
999                seconds: signed_at,
1000                nanos: 0,
1001            }),
1002            client_operation_id: op_id.to_string(),
1003        }
1004    }
1005
1006    #[tokio::test]
1007    #[serial_test::serial(process_global)]
1008    async fn sign_state_persists_to_review_signatures_blob() {
1009        let (svc, repo, _tmp) = fresh_service();
1010        let state_id = capture_state(&repo);
1011
1012        let resp = svc
1013            .sign_state(Request::new(sign_request(&state_id, "")))
1014            .await
1015            .expect("sign_state");
1016        assert!(!resp.get_ref().signature_id.is_empty());
1017        assert_eq!(resp.get_ref().state_id, state_id.as_bytes().to_vec());
1018
1019        let listing = svc
1020            .list_signatures(Request::new(ListSignaturesRequest {
1021                repo_path: String::new(),
1022                state_id: state_id.as_bytes().to_vec(),
1023            }))
1024            .await
1025            .expect("list_signatures");
1026        let sigs = &listing.get_ref().signatures;
1027        assert_eq!(sigs.len(), 1, "expected one signature, got {sigs:?}");
1028        assert_eq!(sigs[0].kind, grpc::heddle::v1::ReviewKind::Read as i32);
1029        assert_eq!(sigs[0].algorithm, "ed25519");
1030        assert_eq!(sigs[0].actor_name, "Alice Tester");
1031        assert_eq!(sigs[0].actor_email, "alice@example.com");
1032        let scope_case = sigs[0].scope.as_ref().and_then(|s| s.scope.as_ref());
1033        assert!(matches!(
1034            scope_case,
1035            Some(grpc::heddle::v1::review_scope::Scope::WholeChange(_))
1036        ));
1037    }
1038
1039    #[tokio::test]
1040    #[serial_test::serial(process_global)]
1041    async fn sign_state_idempotent() {
1042        let (svc, repo, _tmp) = fresh_service();
1043        let state_id = capture_state(&repo);
1044        let op_id = objects::object::OperationId::new().to_string();
1045        // The second call must replay the *same* request body — fresh
1046        // signatures hash differently, so we build once and clone.
1047        let req = sign_request(&state_id, &op_id);
1048
1049        let first = svc
1050            .sign_state(Request::new(req.clone()))
1051            .await
1052            .expect("first sign_state");
1053        let second = svc
1054            .sign_state(Request::new(req))
1055            .await
1056            .expect("second sign_state");
1057        assert_eq!(
1058            first.get_ref().signature_id,
1059            second.get_ref().signature_id,
1060            "replayed call must return the same signature_id"
1061        );
1062
1063        let listing = svc
1064            .list_signatures(Request::new(ListSignaturesRequest {
1065                repo_path: String::new(),
1066                state_id: state_id.as_bytes().to_vec(),
1067            }))
1068            .await
1069            .expect("list_signatures");
1070        assert_eq!(
1071            listing.get_ref().signatures.len(),
1072            1,
1073            "idempotent replay must not append a duplicate signature"
1074        );
1075    }
1076
1077    #[tokio::test]
1078    #[serial_test::serial(process_global)]
1079    async fn sign_state_rejects_forged_signature() {
1080        let (svc, repo, _tmp) = fresh_service();
1081        let state_id = capture_state(&repo);
1082        let mut req = sign_request(&state_id, "");
1083        // Flip the last byte of the signature so verification fails.
1084        let last = req.signature.len() - 1;
1085        req.signature[last] ^= 0xff;
1086
1087        let err = svc
1088            .sign_state(Request::new(req))
1089            .await
1090            .expect_err("forged signature must be rejected");
1091        assert_eq!(err.code(), tonic::Code::InvalidArgument, "{err:?}");
1092        assert!(
1093            err.message().contains("failed verification"),
1094            "unexpected error message: {}",
1095            err.message()
1096        );
1097    }
1098
1099    #[tokio::test]
1100    #[serial_test::serial(process_global)]
1101    async fn sign_state_rejects_skewed_timestamp() {
1102        let (svc, repo, _tmp) = fresh_service();
1103        let state_id = capture_state(&repo);
1104        let mut req = sign_request(&state_id, "");
1105        // Timestamp 1 hour in the future is well outside the skew window.
1106        if let Some(ts) = req.signed_at.as_mut() {
1107            ts.seconds += 60 * 60;
1108        }
1109
1110        let err = svc
1111            .sign_state(Request::new(req))
1112            .await
1113            .expect_err("skewed timestamp must be rejected");
1114        assert_eq!(err.code(), tonic::Code::InvalidArgument);
1115        assert!(err.message().contains("too far from server time"));
1116    }
1117
1118    #[tokio::test]
1119    #[serial_test::serial(process_global)]
1120    async fn sign_state_attributes_to_caller_not_state_author() {
1121        // Regression for the codex-flagged bug: sign_state used to
1122        // attribute the signature to the state's author. Bob signing
1123        // Alice's state would record Alice. The signature must record
1124        // the *caller*. In local mode the caller is resolved via
1125        // `Repository::get_principal()` (env vars then config).
1126        let (svc, repo, _tmp) = fresh_service();
1127        // `fresh_service` already set the env to Alice; capture the
1128        // state under Alice's identity so `state.attribution.principal`
1129        // is Alice.
1130        let state_id = capture_state(&repo);
1131
1132        // Now switch the local user to Bob and sign Alice's state.
1133        // SAFETY: tests run with a controlled environment.
1134        unsafe {
1135            std::env::set_var("HEDDLE_PRINCIPAL_NAME", "Bob Signer");
1136            std::env::set_var("HEDDLE_PRINCIPAL_EMAIL", "bob@example.com");
1137        }
1138
1139        svc.sign_state(Request::new(sign_request(&state_id, "")))
1140            .await
1141            .expect("sign_state");
1142
1143        let listing = svc
1144            .list_signatures(Request::new(ListSignaturesRequest {
1145                repo_path: String::new(),
1146                state_id: state_id.as_bytes().to_vec(),
1147            }))
1148            .await
1149            .expect("list_signatures");
1150        let sigs = &listing.get_ref().signatures;
1151        assert_eq!(sigs.len(), 1);
1152        assert_eq!(
1153            sigs[0].actor_name, "Bob Signer",
1154            "signature must attribute to the caller (Bob), not the state author (Alice)"
1155        );
1156        assert_eq!(sigs[0].actor_email, "bob@example.com");
1157
1158        // Restore the env so other serial tests see the expected
1159        // Alice baseline.
1160        unsafe {
1161            std::env::set_var("HEDDLE_PRINCIPAL_NAME", "Alice Tester");
1162            std::env::set_var("HEDDLE_PRINCIPAL_EMAIL", "alice@example.com");
1163        }
1164    }
1165
1166    #[tokio::test]
1167    #[serial_test::serial(process_global)]
1168    async fn sign_state_serializes_concurrent_appends() {
1169        // Regression for the codex-flagged race: two SignStates with
1170        // different operation ids could both read the same base
1171        // `review_signatures` blob, then the second `put_state` would
1172        // clobber the first signature. The fix wraps the
1173        // read-modify-write in `repo.locker().write()` and re-loads
1174        // the state inside the lock.
1175        let (svc, repo, _tmp) = fresh_service();
1176        let state_id = capture_state(&repo);
1177
1178        // Two distinct ops so neither replays the other through dedup.
1179        let op_a = objects::object::OperationId::new().to_string();
1180        let op_b = objects::object::OperationId::new().to_string();
1181        let req_a = sign_request(&state_id, &op_a);
1182        let req_b = sign_request(&state_id, &op_b);
1183
1184        let svc_a = svc.clone();
1185        let svc_b = svc.clone();
1186        let (a, b) = tokio::join!(
1187            svc_a.sign_state(Request::new(req_a)),
1188            svc_b.sign_state(Request::new(req_b)),
1189        );
1190        a.expect("first sign_state");
1191        b.expect("second sign_state");
1192
1193        let listing = svc
1194            .list_signatures(Request::new(ListSignaturesRequest {
1195                repo_path: String::new(),
1196                state_id: state_id.as_bytes().to_vec(),
1197            }))
1198            .await
1199            .expect("list_signatures");
1200        assert_eq!(
1201            listing.get_ref().signatures.len(),
1202            2,
1203            "both concurrent signatures must land — neither should be lost \
1204             to a stale-blob clobber"
1205        );
1206    }
1207
1208    /// Regression: `get_review_payload` previously returned
1209    /// `summary.files_changed = 0` and empty `in_budget_signals` for
1210    /// every state, even when the diff against the parent had real
1211    /// content. This test snapshots once (root state — every file is
1212    /// "added"), then snapshots again with a modification, and asserts
1213    /// both states report a non-empty diff summary plus a populated
1214    /// `diff_summary` signal anchored on the changed file.
1215    #[tokio::test]
1216    #[serial_test::serial(process_global)]
1217    async fn get_review_payload_populates_diff_summary_and_signals() {
1218        let (svc, repo, _tmp) = fresh_service();
1219
1220        // First capture: root state, one file added. Every line of
1221        // `hello.txt` should count as added.
1222        std::fs::write(repo.root().join("hello.txt"), b"first\nsecond\nthird\n")
1223            .expect("write hello.txt");
1224        let first = repo
1225            .snapshot(Some("first capture".to_string()), None)
1226            .expect("first snapshot");
1227
1228        let resp_first = svc
1229            .get_review_payload(Request::new(GetReviewPayloadRequest {
1230                repo_path: String::new(),
1231                state_id: first.change_id.as_bytes().to_vec(),
1232                include_all_signals: false,
1233            }))
1234            .await
1235            .expect("get_review_payload first");
1236        let payload_first = resp_first.into_inner();
1237        let summary_first = payload_first.summary.as_ref().expect("summary present");
1238        assert!(
1239            summary_first.files_changed >= 1,
1240            "first state should report at least one file changed (vs empty parent), got {}",
1241            summary_first.files_changed
1242        );
1243        assert!(
1244            summary_first.added_lines >= 3,
1245            "first state should report 3+ added lines, got {}",
1246            summary_first.added_lines
1247        );
1248        assert!(
1249            !payload_first.in_budget_signals.is_empty(),
1250            "in_budget_signals must include a diff_summary entry"
1251        );
1252        let first_signal = &payload_first.in_budget_signals[0];
1253        assert!(
1254            first_signal.kind.starts_with("diff_summary."),
1255            "expected synthetic diff_summary signal kind, got {}",
1256            first_signal.kind
1257        );
1258        assert_eq!(first_signal.producer_module, "review_show.diff_summary");
1259        assert_eq!(first_signal.visibility, "visible");
1260
1261        // Second capture: modify the file. Diff vs the first state's
1262        // tree should report a single modified file with at least one
1263        // added and one removed line.
1264        std::fs::write(
1265            repo.root().join("hello.txt"),
1266            b"first\nsecond\nthird\nfourth\n",
1267        )
1268        .expect("modify hello.txt");
1269        let second = repo
1270            .snapshot(Some("second capture".to_string()), None)
1271            .expect("second snapshot");
1272
1273        let resp_second = svc
1274            .get_review_payload(Request::new(GetReviewPayloadRequest {
1275                repo_path: String::new(),
1276                state_id: second.change_id.as_bytes().to_vec(),
1277                include_all_signals: false,
1278            }))
1279            .await
1280            .expect("get_review_payload second");
1281        let payload_second = resp_second.into_inner();
1282        let summary_second = payload_second.summary.as_ref().expect("summary present");
1283        assert_eq!(
1284            summary_second.files_changed, 1,
1285            "second state should report exactly one modified file"
1286        );
1287        assert!(
1288            summary_second.added_lines >= 1,
1289            "second state should report at least one added line, got {}",
1290            summary_second.added_lines
1291        );
1292        assert!(
1293            !payload_second.in_budget_signals.is_empty(),
1294            "in_budget_signals must include a diff_summary entry"
1295        );
1296        let signal = &payload_second.in_budget_signals[0];
1297        assert_eq!(
1298            signal
1299                .anchor
1300                .as_ref()
1301                .map(|a| a.file.as_str())
1302                .unwrap_or(""),
1303            "hello.txt",
1304            "diff_summary signal should anchor on the modified file"
1305        );
1306        assert!(
1307            signal.reason.contains("files changed"),
1308            "first signal reason should carry the aggregate summary, got {}",
1309            signal.reason
1310        );
1311        // The summary's signal-count fields should reflect the visible
1312        // budget so consumers can short-circuit on empty-vs-populated
1313        // without re-counting the array.
1314        assert_eq!(
1315            summary_second.in_budget_signal_count,
1316            payload_second.in_budget_signals.len() as u32,
1317            "in_budget_signal_count must match the array length"
1318        );
1319    }
1320
1321    /// Regression for codex feedback on PRs #52 (tree fallback) and
1322    /// #56 (blob fallback): `compute_state_diff_summary` must tolerate
1323    /// missing trees and blobs by returning an empty/partial summary
1324    /// rather than blocking the entire review payload. Construct a
1325    /// state whose `tree` points to a content hash that was never
1326    /// stored — `get_tree` returns `Ok(None)`, and the function must
1327    /// fall back to an empty changeset instead of erroring out of
1328    /// `diff_trees`. (Pre-fix, the Modified branch already tolerated
1329    /// missing blobs via `.ok().flatten()`, but the Added/Deleted
1330    /// branches and the top-level diff_trees call did not — surfaces
1331    /// of pruned object stores all errored out inconsistently.)
1332    #[tokio::test]
1333    #[serial_test::serial(process_global)]
1334    async fn get_review_payload_tolerates_missing_tree() {
1335        let (svc, repo, _tmp) = fresh_service();
1336        let state_id = capture_state(&repo);
1337
1338        // Load the state, swap its tree pointer to a synthetic hash
1339        // that nothing references, and re-persist. The state object
1340        // itself survives, but `repo.store().get_tree(state.tree)`
1341        // will return `Ok(None)` — the missing-tree case.
1342        let state = repo
1343            .store()
1344            .get_state(&state_id)
1345            .expect("get state")
1346            .expect("state present");
1347        let bogus_tree = objects::object::ContentHash::compute(b"definitely-not-in-store-bytes");
1348        let mut mutated = state.clone();
1349        mutated.tree = bogus_tree;
1350        repo.store().put_state(&mutated).expect("put mutated state");
1351
1352        // The review payload must still come back — empty summary,
1353        // but no Status::Internal error from the gRPC layer.
1354        let resp = svc
1355            .get_review_payload(Request::new(GetReviewPayloadRequest {
1356                repo_path: String::new(),
1357                state_id: state_id.as_bytes().to_vec(),
1358                include_all_signals: false,
1359            }))
1360            .await
1361            .expect("missing tree must not block review payload");
1362        let payload = resp.into_inner();
1363        let summary = payload.summary.as_ref().expect("summary present");
1364        assert_eq!(
1365            summary.files_changed, 0,
1366            "missing tree must produce a zero-change summary, got {} files",
1367            summary.files_changed
1368        );
1369        // Synthetic diff_summary signal should still be present (with
1370        // the `empty` kind) so consumers always see at least one
1371        // signal — keeps the wire shape stable.
1372        assert!(
1373            !payload.in_budget_signals.is_empty(),
1374            "in_budget_signals should always contain at least the synthetic diff_summary entry"
1375        );
1376        let kind = &payload.in_budget_signals[0].kind;
1377        assert!(
1378            kind.starts_with("diff_summary."),
1379            "expected synthetic diff_summary signal, got {kind}"
1380        );
1381    }
1382
1383    /// `line_count` should match git-style line counts — trailing
1384    /// newline never produces a phantom empty line, but an unterminated
1385    /// final line still counts.
1386    #[test]
1387    fn line_count_matches_git_semantics() {
1388        assert_eq!(line_count(b""), 0);
1389        assert_eq!(line_count(b"\n"), 1);
1390        assert_eq!(line_count(b"hello"), 1);
1391        assert_eq!(line_count(b"hello\n"), 1);
1392        assert_eq!(line_count(b"hello\nworld"), 2);
1393        assert_eq!(line_count(b"hello\nworld\n"), 2);
1394        assert_eq!(line_count(b"a\nb\nc\n"), 3);
1395        // Non-utf8 bytes count as zero (treated as binary).
1396        assert_eq!(line_count(&[0xff, 0xfe, 0xfd]), 0);
1397    }
1398}