Skip to main content

daemon/grpc_local_impl/
state_review.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Local-mode implementation of the [`StateReviewService`] gRPC contract.
3//!
4//! Reads / writes the [`ReviewSignaturesBlob`] persisted at
5//! [`State::review_signatures`]. Verifies the client-supplied signature
6//! against the deterministic [`signing_payload`] before persisting.
7
8// `::state_review` disambiguates from this module's own name
9// (`grpc_local_impl::state_review`), the same way the hosted impl
10// disambiguates by being in a sibling module.
11use ::state_review::{
12    PathSymbol, ReadingOrderPartition, SymbolKind, build_review_payload_partition,
13};
14use crypto::verify_payload_signature;
15use grpc::heddle::v1::{
16    AnchoredDiscussion, GetReviewPayloadRequest, ListSignaturesRequest, ListSignaturesResponse,
17    MergeRequirement, PathSymbolRef as ProtoPathSymbolRef,
18    ReadingOrderPartition as ProtoReadingOrderPartition, ReviewPayload,
19    ReviewScope as ProtoReviewScope, ReviewSignature as ProtoReviewSignature, ReviewSummary,
20    RiskSignal as ProtoRiskSignal, SignStateRequest, SignStateResponse,
21    SignalAnchor as ProtoSignalAnchor, SigningFooter,
22    state_review_service_server::StateReviewService,
23};
24use objects::{
25    lock::RepositoryLockExt,
26    object::{
27        Blob, ChangeId, DiffKind, Discussion, DiscussionResolution, DiscussionsBlob, ReviewKind,
28        ReviewScope, ReviewSignature, ReviewSignaturesBlob, RiskSignalBlob, State, SymbolAnchor,
29        signing_payload,
30    },
31    worktree::diff_blobs,
32};
33use prost::Message;
34use repo::Repository;
35use tonic::{Request, Response, Status};
36
37use super::{GrpcLocalService, to_status, with_idempotency};
38
39/// Maximum drift (seconds) between the client's `signed_at_unix` and the
40/// server's wall clock. Generous enough to absorb NTP skew, narrow enough
41/// to bound the window for replay-style attacks.
42const SIGN_TIMESTAMP_SKEW_SECS: i64 = 5 * 60;
43
44/// Local-mode `StateReviewService` implementation.
45#[derive(Clone)]
46pub struct LocalStateReviewService {
47    inner: GrpcLocalService,
48}
49
50impl LocalStateReviewService {
51    pub fn new(inner: GrpcLocalService) -> Self {
52        Self { inner }
53    }
54}
55
56#[tonic::async_trait]
57impl StateReviewService for LocalStateReviewService {
58    async fn get_review_payload(
59        &self,
60        request: Request<GetReviewPayloadRequest>,
61    ) -> Result<Response<ReviewPayload>, Status> {
62        let req = request.into_inner();
63        let change_id = parse_change_id(&req.state_id)?;
64        let repo = self.inner.repo();
65        let state = repo
66            .store()
67            .get_state(&change_id)
68            .map_err(to_status)?
69            .ok_or_else(|| {
70                Status::not_found(format!("state {} not found", change_id.to_string_full()))
71            })?;
72
73        // Diff the state's tree against its first parent so the summary
74        // counts reflect what actually changed in this state. The
75        // signal registry / budgeter will eventually layer on top of
76        // this; until then `files_changed` is the most useful single
77        // number an agent can use for self-review.
78        let diff_summary = compute_state_diff_summary(repo, &state).map_err(to_status)?;
79
80        let summary = ReviewSummary {
81            headline: state.intent.clone().unwrap_or_default(),
82            files_changed: diff_summary.files_changed,
83            added_lines: diff_summary.added_lines,
84            removed_lines: diff_summary.removed_lines,
85            in_budget_signal_count: 0,
86            hidden_signal_count: 0,
87        };
88
89        let agent_narrative = if state.attribution.agent.is_some() {
90            state.intent.clone().unwrap_or_default()
91        } else {
92            String::new()
93        };
94
95        // Surface fired risk signals if requested. The signal registry will
96        // replace this with a proper budget split; until then everything we
97        // read is "visible".
98        let mut all_signals: Vec<ProtoRiskSignal> = Vec::new();
99        if req.include_all_signals
100            && let Some(hash) = state.risk_signals
101            && let Some(blob) = repo.store().get_blob(&hash).map_err(to_status)?
102        {
103            let decoded = RiskSignalBlob::decode(blob.content())
104                .map_err(|err| Status::internal(format!("decode risk signals: {err}")))?;
105            all_signals = decoded
106                .signals
107                .into_iter()
108                .map(|s| risk_signal_to_proto(s, "visible"))
109                .collect();
110        }
111
112        // Synthesize a structured `diff_summary` signal so the
113        // `in_budget_signals` array is non-empty even before the real
114        // signal registry is wired up. Anchored on each modified
115        // file (capped) so consumers can iterate without losing the
116        // summary aggregate. This is a deliberate stable shape: agents
117        // already iterating signals get a usable record per file
118        // change, and the registry-driven path will simply layer real
119        // signals alongside it.
120        let mut in_budget_signals: Vec<ProtoRiskSignal> = Vec::new();
121        let summary_kind = match (
122            diff_summary.added_files,
123            diff_summary.modified_files,
124            diff_summary.deleted_files,
125        ) {
126            (a, 0, 0) if a > 0 => "diff_summary.added_only",
127            (0, m, 0) if m > 0 => "diff_summary.modified_only",
128            (0, 0, d) if d > 0 => "diff_summary.deleted_only",
129            (0, 0, 0) => "diff_summary.empty",
130            _ => "diff_summary.mixed",
131        };
132        let summary_reason = format!(
133            "{} files changed (+{}/-{}, {} added, {} modified, {} deleted)",
134            diff_summary.files_changed,
135            diff_summary.added_lines,
136            diff_summary.removed_lines,
137            diff_summary.added_files,
138            diff_summary.modified_files,
139            diff_summary.deleted_files,
140        );
141        // Per-file anchors keep the array reasoning-friendly when
142        // many files change, but cap so very large diffs don't bloat
143        // the payload. The aggregate summary always rides on the
144        // first entry's reason field; the rest carry per-file deltas.
145        const MAX_DIFF_SIGNAL_ANCHORS: usize = 32;
146        if diff_summary.changed_paths.is_empty() {
147            in_budget_signals.push(ProtoRiskSignal {
148                kind: summary_kind.to_string(),
149                anchor: Some(ProtoSignalAnchor {
150                    file: String::new(),
151                    symbol: String::new(),
152                    start_line: 0,
153                    end_line: 0,
154                }),
155                reason: summary_reason.clone(),
156                producer_module: "review_show.diff_summary".to_string(),
157                producer_version: 1,
158                computed_at: None,
159                visibility: "visible".to_string(),
160            });
161        } else {
162            for (idx, path_kind) in diff_summary
163                .changed_paths
164                .iter()
165                .take(MAX_DIFF_SIGNAL_ANCHORS)
166                .enumerate()
167            {
168                let reason = if idx == 0 {
169                    summary_reason.clone()
170                } else {
171                    format!("{} ({})", path_kind.path, path_kind.kind_str())
172                };
173                in_budget_signals.push(ProtoRiskSignal {
174                    kind: summary_kind.to_string(),
175                    anchor: Some(ProtoSignalAnchor {
176                        file: path_kind.path.clone(),
177                        symbol: String::new(),
178                        start_line: 0,
179                        end_line: 0,
180                    }),
181                    reason,
182                    producer_module: "review_show.diff_summary".to_string(),
183                    producer_version: 1,
184                    computed_at: None,
185                    visibility: "visible".to_string(),
186                });
187            }
188        }
189
190        // Server-side reading-order partition. Same per-symbol
191        // extraction logic as the hosted handler: tree-sitter when the
192        // `semantic` feature is enabled, path-only fallback otherwise.
193        let symbols = changed_files_as_symbols(repo, &state).map_err(to_status)?;
194        let partition = build_review_payload_partition(&symbols);
195
196        // Project the state's `DiscussionsBlob` (when present)
197        // into the wire-shape `AnchoredDiscussion` list.
198        let discussions = match state.discussions {
199            Some(hash) => {
200                let blob = repo
201                    .store()
202                    .get_blob(&hash)
203                    .map_err(to_status)?
204                    .ok_or_else(|| {
205                        Status::internal(format!(
206                            "discussions blob {} referenced by state {} is missing",
207                            hash,
208                            state.change_id.to_string_full()
209                        ))
210                    })?;
211                let decoded = DiscussionsBlob::decode(blob.content())
212                    .map_err(|err| Status::internal(format!("decode discussions: {err}")))?;
213                decoded
214                    .discussions
215                    .iter()
216                    .map(discussion_to_anchored_proto)
217                    .collect()
218            }
219            None => Vec::<AnchoredDiscussion>::new(),
220        };
221
222        let mut summary = summary;
223        summary.in_budget_signal_count = in_budget_signals.len() as u32;
224        summary.hidden_signal_count =
225            all_signals.len().saturating_sub(in_budget_signals.len()) as u32;
226
227        let payload = ReviewPayload {
228            state_id: req.state_id.clone(),
229            summary: Some(summary),
230            agent_narrative,
231            partition: Some(partition_to_proto(partition)),
232            in_budget_signals,
233            all_signals,
234            tick_budget: 3,
235            discussions,
236            // Local mode has no policy registry — merge requirements
237            // are surfaced only by the hosted handler.
238            merge_requirements: Vec::<MergeRequirement>::new(),
239            signing_footer: Some(SigningFooter {
240                available_kinds: vec![
241                    grpc::heddle::v1::ReviewKind::Read as i32,
242                    grpc::heddle::v1::ReviewKind::AgentPreview as i32,
243                    grpc::heddle::v1::ReviewKind::AgentCoReview as i32,
244                ],
245            }),
246        };
247
248        Ok(Response::new(payload))
249    }
250
251    async fn sign_state(
252        &self,
253        request: Request<SignStateRequest>,
254    ) -> Result<Response<SignStateResponse>, Status> {
255        let req = request.into_inner();
256        let req_bytes = req.encode_to_vec();
257        let client_operation_id = req.client_operation_id.clone();
258        let inner = self.inner.clone();
259
260        let response = with_idempotency(
261            self.inner.dedup(),
262            &client_operation_id,
263            "state_review.sign_state",
264            &req_bytes,
265            |resp: &SignStateResponse| resp.encode_to_vec(),
266            |bytes| {
267                SignStateResponse::decode(&bytes[..]).map_err(|e| Status::internal(e.to_string()))
268            },
269            move || {
270                let inner = inner.clone();
271                async move { execute_sign_state(&inner, req).await }
272            },
273        )
274        .await?;
275
276        Ok(Response::new(response))
277    }
278
279    async fn list_signatures(
280        &self,
281        request: Request<ListSignaturesRequest>,
282    ) -> Result<Response<ListSignaturesResponse>, Status> {
283        let req = request.into_inner();
284        let change_id = parse_change_id(&req.state_id)?;
285        let repo = self.inner.repo();
286        let state = repo
287            .store()
288            .get_state(&change_id)
289            .map_err(to_status)?
290            .ok_or_else(|| {
291                Status::not_found(format!("state {} not found", change_id.to_string_full()))
292            })?;
293
294        let signatures = match state.review_signatures {
295            Some(hash) => {
296                let blob = repo
297                    .store()
298                    .get_blob(&hash)
299                    .map_err(to_status)?
300                    .ok_or_else(|| {
301                        Status::internal(format!(
302                            "review signatures blob {} missing from object store",
303                            hash
304                        ))
305                    })?;
306                let decoded = ReviewSignaturesBlob::decode(blob.content())
307                    .map_err(|err| Status::internal(format!("decode review signatures: {err}")))?;
308                decoded
309                    .signatures
310                    .into_iter()
311                    .enumerate()
312                    .map(|(idx, sig)| review_signature_to_proto(sig, synthetic_signature_id(idx)))
313                    .collect()
314            }
315            None => Vec::new(),
316        };
317
318        Ok(Response::new(ListSignaturesResponse { signatures }))
319    }
320}
321
322/// Body of [`LocalStateReviewService::sign_state`]. Lifted out of the trait
323/// method so [`with_idempotency`] can re-execute it inside its closure.
324async fn execute_sign_state(
325    inner: &GrpcLocalService,
326    req: SignStateRequest,
327) -> Result<SignStateResponse, Status> {
328    // 1. Validate the kind.
329    let kind = match grpc::heddle::v1::ReviewKind::try_from(req.kind)
330        .map_err(|_| Status::invalid_argument(format!("unknown review kind tag {}", req.kind)))?
331    {
332        grpc::heddle::v1::ReviewKind::Read => ReviewKind::Read,
333        grpc::heddle::v1::ReviewKind::AgentPreview => ReviewKind::AgentPreview,
334        grpc::heddle::v1::ReviewKind::AgentCoReview => ReviewKind::AgentCoReview,
335        grpc::heddle::v1::ReviewKind::Unspecified => {
336            return Err(Status::invalid_argument("review kind is required"));
337        }
338    };
339
340    // 2. Locate the state.
341    let change_id = parse_change_id(&req.state_id)?;
342    let repo = inner.repo();
343
344    // 3. Translate the scope.
345    let scope = match req.scope.as_ref() {
346        Some(s) => proto_scope_to_object(s)?,
347        None => ReviewScope::WholeChange,
348    };
349
350    // 4. Build the ReviewSignature, then verify the client-supplied
351    // signature is well-formed and matches the deterministic signing
352    // payload. A malformed or forged signature must never reach the
353    // persisted blob. Attribute the signature to the local-mode
354    // caller (`Repository::get_principal` resolves env vars then
355    // `[principal]` in `.heddle/config.toml`), not the state's author
356    // — Bob signing Alice's state should record Bob.
357    let actor = repo
358        .get_principal()
359        .map_err(|err| Status::internal(format!("resolve caller principal: {err}")))?;
360    let justification = if req.justification.is_empty() {
361        None
362    } else {
363        Some(req.justification.clone())
364    };
365
366    let now = chrono::Utc::now().timestamp();
367    let signed_at = req.signed_at.as_ref().map(|t| t.seconds).unwrap_or(0);
368    if signed_at == 0 {
369        return Err(Status::invalid_argument(
370            "signed_at is required and must match the timestamp the client signed over",
371        ));
372    }
373    if (signed_at - now).abs() > SIGN_TIMESTAMP_SKEW_SECS {
374        return Err(Status::invalid_argument(format!(
375            "signed_at={signed_at} is too far from server time={now} (max skew {SIGN_TIMESTAMP_SKEW_SECS}s)"
376        )));
377    }
378
379    let new_sig = ReviewSignature {
380        actor,
381        kind,
382        scope: scope.clone(),
383        justification: justification.clone(),
384        signed_at,
385        algorithm: req.algorithm.clone(),
386        public_key: hex::encode(&req.public_key),
387        signature: hex::encode(&req.signature),
388    };
389    new_sig
390        .validate()
391        .map_err(|err| Status::invalid_argument(format!("invalid review signature: {err}")))?;
392
393    let public_key_bytes = req.public_key.clone();
394    let signature_bytes = req.signature.clone();
395    let payload = signing_payload(change_id, kind, &scope, signed_at, justification.as_deref());
396    verify_payload_signature(
397        &payload,
398        &req.algorithm,
399        &public_key_bytes,
400        &signature_bytes,
401    )
402    .map_err(|err| {
403        Status::invalid_argument(format!(
404            "review signature failed verification ({}): {err}",
405            req.algorithm
406        ))
407    })?;
408
409    // 5. Serialize the read-modify-write on `review_signatures`
410    // behind the repo write-lock and re-load the state inside the
411    // critical section. Two concurrent SignStates with different
412    // operation ids would otherwise both read the same base blob and
413    // the second `put_state` would clobber the first signature.
414    let _lock = repo
415        .locker()
416        .write()
417        .map_err(|err| Status::internal(err.to_string()))?;
418    let state = repo
419        .store()
420        .get_state(&change_id)
421        .map_err(to_status)?
422        .ok_or_else(|| {
423            Status::not_found(format!("state {} not found", change_id.to_string_full()))
424        })?;
425    let mut blob = match state.review_signatures {
426        Some(hash) => {
427            let raw = repo
428                .store()
429                .get_blob(&hash)
430                .map_err(to_status)?
431                .ok_or_else(|| {
432                    Status::internal(format!(
433                        "existing review signatures blob {} missing from object store",
434                        hash
435                    ))
436                })?;
437            ReviewSignaturesBlob::decode(raw.content())
438                .map_err(|err| Status::internal(format!("decode review signatures: {err}")))?
439        }
440        None => ReviewSignaturesBlob::new(Vec::new()),
441    };
442    blob.signatures.push(new_sig);
443    let new_index = blob.signatures.len() - 1;
444
445    // 6. Persist the new blob.
446    let bytes = blob
447        .encode()
448        .map_err(|err| Status::internal(format!("encode review signatures: {err}")))?;
449    let content_hash = repo
450        .store()
451        .put_blob(&Blob::new(bytes))
452        .map_err(to_status)?;
453
454    // 7. Persist the updated state.
455    let new_state = state.with_review_signatures(content_hash);
456    repo.store().put_state(&new_state).map_err(to_status)?;
457
458    Ok(SignStateResponse {
459        signature_id: synthetic_signature_id(new_index),
460        state_id: req.state_id,
461    })
462}
463
464/// `ReviewSignature` doesn't carry an explicit id; we synthesise one from
465/// the per-state index so the wire surface has stable signature ids within a
466/// single state. (A future schema bump may add an explicit id.)
467fn synthetic_signature_id(index: usize) -> String {
468    format!("rs-{index}")
469}
470
471fn parse_change_id(s: &[u8]) -> Result<ChangeId, Status> {
472    ChangeId::try_from_slice(s)
473        .map_err(|err| Status::invalid_argument(format!("invalid state_id: {err}")))
474}
475
476fn proto_scope_to_object(scope: &ProtoReviewScope) -> Result<ReviewScope, Status> {
477    use grpc::heddle::v1::review_scope::Scope;
478    match scope.scope.as_ref() {
479        None | Some(Scope::WholeChange(_)) => Ok(ReviewScope::WholeChange),
480        Some(Scope::Symbols(list)) => {
481            if list.symbols.is_empty() {
482                return Err(Status::invalid_argument(
483                    "symbols scope requires at least one symbol anchor",
484                ));
485            }
486            let symbols = list
487                .symbols
488                .iter()
489                .map(|s| SymbolAnchor::new(s.file.clone(), s.symbol.clone()))
490                .collect();
491            Ok(ReviewScope::Symbols(symbols))
492        }
493    }
494}
495
496fn object_scope_to_proto(scope: &ReviewScope) -> ProtoReviewScope {
497    use grpc::heddle::v1::review_scope::{Scope, SymbolList, WholeChange};
498    let inner = match scope {
499        ReviewScope::WholeChange => Scope::WholeChange(WholeChange {}),
500        ReviewScope::Symbols(symbols) => Scope::Symbols(SymbolList {
501            symbols: symbols
502                .iter()
503                .map(|s| ProtoPathSymbolRef {
504                    file: s.file.clone(),
505                    symbol: s.symbol.clone(),
506                })
507                .collect(),
508        }),
509    };
510    ProtoReviewScope { scope: Some(inner) }
511}
512
513fn review_signature_to_proto(sig: ReviewSignature, signature_id: String) -> ProtoReviewSignature {
514    ProtoReviewSignature {
515        signature_id,
516        actor_name: sig.actor.name.clone(),
517        actor_email: sig.actor.email.clone(),
518        kind: review_kind_to_proto(sig.kind) as i32,
519        scope: Some(object_scope_to_proto(&sig.scope)),
520        justification: sig.justification.unwrap_or_default(),
521        signed_at: Some(prost_types::Timestamp {
522            seconds: sig.signed_at,
523            nanos: 0,
524        }),
525        algorithm: sig.algorithm,
526        public_key: hex::decode(&sig.public_key).unwrap_or_default(),
527        signature: hex::decode(&sig.signature).unwrap_or_default(),
528    }
529}
530
531fn review_kind_to_proto(kind: ReviewKind) -> grpc::heddle::v1::ReviewKind {
532    match kind {
533        ReviewKind::Read => grpc::heddle::v1::ReviewKind::Read,
534        ReviewKind::AgentPreview => grpc::heddle::v1::ReviewKind::AgentPreview,
535        ReviewKind::AgentCoReview => grpc::heddle::v1::ReviewKind::AgentCoReview,
536    }
537}
538
539fn risk_signal_to_proto(sig: objects::object::RiskSignal, visibility: &str) -> ProtoRiskSignal {
540    let (start_line, end_line) = sig.anchor.line_range.unwrap_or((0, 0));
541    ProtoRiskSignal {
542        kind: sig.kind.as_str().to_string(),
543        anchor: Some(ProtoSignalAnchor {
544            file: sig.anchor.file,
545            symbol: sig.anchor.symbol.unwrap_or_default(),
546            start_line,
547            end_line,
548        }),
549        reason: sig.reason,
550        producer_module: sig.producer.module,
551        producer_version: sig.producer.version,
552        computed_at: Some(prost_types::Timestamp {
553            seconds: sig.computed_at,
554            nanos: 0,
555        }),
556        visibility: visibility.to_string(),
557    }
558}
559
560// ---------------------------------------------------------------------------
561// Symbol extraction + discussion projection (mirrors hosted impl).
562// ---------------------------------------------------------------------------
563
564/// Symbol projection for the reading-order partition. Mirrors the hosted-handler
565/// implementation: when the `semantic` feature is enabled and the
566/// changed file has a tree-sitter parser, emits one [`PathSymbol`] per
567/// definition. Otherwise falls back to a single path-only entry per
568/// changed file (kind = `Other`), routed by the path classifier.
569fn changed_files_as_symbols(
570    repo: &Repository,
571    state: &State,
572) -> objects::error::Result<Vec<PathSymbol>> {
573    let new_tree = match repo.store().get_tree(&state.tree)? {
574        Some(t) => t,
575        None => return Ok(Vec::new()),
576    };
577    let parent_tree = if let Some(parent_id) = state.parents.first() {
578        repo.store()
579            .get_state(parent_id)?
580            .and_then(|p| repo.store().get_tree(&p.tree).ok().flatten())
581    } else {
582        None
583    };
584    let new_files = collect_files(repo, &new_tree, "")?;
585    let parent_files = match parent_tree {
586        Some(t) => collect_files(repo, &t, "")?,
587        None => std::collections::HashMap::new(),
588    };
589
590    let mut out: Vec<PathSymbol> = Vec::new();
591    for (path, hash) in &new_files {
592        let changed = parent_files.get(path).map(|h| h != hash).unwrap_or(true);
593        if !changed {
594            continue;
595        }
596        #[cfg_attr(not(feature = "semantic"), allow(unused_mut))]
597        let mut emitted_any = false;
598        #[cfg(feature = "semantic")]
599        {
600            if let Some(blob) = repo.store().get_blob(hash)? {
601                emitted_any = extract_file_symbols(path, blob.content(), &mut out);
602            }
603        }
604        let _ = hash;
605        if !emitted_any {
606            out.push(PathSymbol {
607                file: path.clone(),
608                symbol: path.clone(),
609                kind: SymbolKind::Other,
610            });
611        }
612    }
613    Ok(out)
614}
615
616#[cfg(feature = "semantic")]
617fn extract_file_symbols(path: &str, source: &[u8], out: &mut Vec<PathSymbol>) -> bool {
618    use ::semantic::symbol_resolver::{Definition, DefinitionKind, extract_definitions};
619    let definitions: Vec<Definition> = match extract_definitions(source, std::path::Path::new(path))
620    {
621        Ok(defs) => defs,
622        Err(_) => return false,
623    };
624    if definitions.is_empty() {
625        return false;
626    }
627    for d in definitions {
628        let kind = match d.kind {
629            DefinitionKind::Type => SymbolKind::Type,
630            DefinitionKind::Trait => SymbolKind::Trait,
631            DefinitionKind::Class => SymbolKind::Class,
632            DefinitionKind::Interface => SymbolKind::Interface,
633            DefinitionKind::TypeAlias => SymbolKind::TypeAlias,
634            DefinitionKind::EnumDef => SymbolKind::EnumDef,
635            DefinitionKind::ConstDecl => SymbolKind::ConstDecl,
636            DefinitionKind::Module => SymbolKind::Module,
637            DefinitionKind::Function => SymbolKind::Function,
638            DefinitionKind::Other => SymbolKind::Other,
639        };
640        let symbol = match d.parent_name.as_deref() {
641            Some(parent) if !parent.is_empty() => format!("{parent}::{}", d.name),
642            _ => d.name,
643        };
644        out.push(PathSymbol {
645            file: path.to_string(),
646            symbol,
647            kind,
648        });
649    }
650    true
651}
652
653fn collect_files(
654    repo: &Repository,
655    tree: &objects::object::Tree,
656    prefix: &str,
657) -> objects::error::Result<std::collections::HashMap<String, objects::object::ContentHash>> {
658    let mut out = std::collections::HashMap::new();
659    for entry in tree.entries() {
660        let path = if prefix.is_empty() {
661            entry.name.clone()
662        } else {
663            format!("{prefix}/{}", entry.name)
664        };
665        if entry.is_tree() {
666            if let Some(subtree) = repo.store().get_tree(&entry.hash)? {
667                let sub = collect_files(repo, &subtree, &path)?;
668                out.extend(sub);
669            }
670        } else {
671            out.insert(path, entry.hash);
672        }
673    }
674    Ok(out)
675}
676
677fn partition_to_proto(p: ReadingOrderPartition) -> ProtoReadingOrderPartition {
678    ProtoReadingOrderPartition {
679        structural: p.structural.iter().map(path_symbol_to_proto).collect(),
680        consequence: p.consequence.iter().map(path_symbol_to_proto).collect(),
681        tests_and_docs: p.tests_and_docs.iter().map(path_symbol_to_proto).collect(),
682    }
683}
684
685fn path_symbol_to_proto(p: &PathSymbol) -> ProtoPathSymbolRef {
686    ProtoPathSymbolRef {
687        file: p.file.clone(),
688        symbol: p.symbol.clone(),
689    }
690}
691
692fn discussion_to_anchored_proto(d: &Discussion) -> AnchoredDiscussion {
693    AnchoredDiscussion {
694        id: d.id.clone(),
695        anchor: Some(ProtoPathSymbolRef {
696            file: d.anchor.file.clone(),
697            symbol: d.anchor.symbol.clone(),
698        }),
699        opened_against_state: d.opened_against_state.as_bytes().to_vec(),
700        opened_at: Some(prost_types::Timestamp {
701            seconds: d.opened_at,
702            nanos: 0,
703        }),
704        turns: d
705            .turns
706            .iter()
707            .map(|t| grpc::heddle::v1::DiscussionTurn {
708                author_name: t.author.name.clone(),
709                author_email: t.author.email.clone(),
710                body: t.body.clone(),
711                posted_at: Some(prost_types::Timestamp {
712                    seconds: t.posted_at,
713                    nanos: 0,
714                }),
715            })
716            .collect(),
717        resolution: Some(discussion_resolution_to_proto(&d.resolution)),
718        body_changed_since_open: d.body_changed_since_open,
719        orphaned: d.orphaned,
720        visibility: d.visibility.as_str().to_string(),
721    }
722}
723
724fn discussion_resolution_to_proto(
725    resolution: &DiscussionResolution,
726) -> grpc::heddle::v1::DiscussionResolution {
727    use grpc::heddle::v1::discussion_resolution::{
728        Dismissed, Open, ResolvedByEdit, ResolvedIntoAnnotation, State,
729    };
730    let state = match resolution {
731        DiscussionResolution::Open => State::Open(Open {}),
732        DiscussionResolution::ResolvedIntoAnnotation { annotation_id } => {
733            State::IntoAnnotation(ResolvedIntoAnnotation {
734                annotation_id: annotation_id.clone(),
735            })
736        }
737        DiscussionResolution::ResolvedByEdit { state_id } => State::ByEdit(ResolvedByEdit {
738            state_id: state_id.as_bytes().to_vec(),
739        }),
740        DiscussionResolution::Dismissed { reason } => State::Dismissed(Dismissed {
741            reason: reason.clone(),
742        }),
743    };
744    grpc::heddle::v1::DiscussionResolution { state: Some(state) }
745}
746
747// ---------------------------------------------------------------------------
748// Diff summary helpers (state.tree vs first parent's tree).
749// ---------------------------------------------------------------------------
750
751/// File-change kinds we surface in the diff summary signal anchors.
752/// Mirrors `objects::object::DiffKind` minus the `Unchanged` variant
753/// (we filter those out before constructing this).
754#[derive(Debug, Clone)]
755struct ChangedPath {
756    path: String,
757    kind: DiffKind,
758}
759
760impl ChangedPath {
761    fn kind_str(&self) -> &'static str {
762        match self.kind {
763            DiffKind::Added => "added",
764            DiffKind::Modified => "modified",
765            DiffKind::Deleted => "deleted",
766            DiffKind::Unchanged => "unchanged",
767        }
768    }
769}
770
771/// Aggregated counts plus a path list, computed by diffing
772/// `state.tree` against the first parent's tree (or empty when the
773/// state is a root). When `state.parents` is empty every file in the
774/// state's tree counts as added, which makes "first capture" reviews
775/// non-empty too. The `_state` prefix on `_state` is intentional: the
776/// helper currently only reads `state.tree` and `state.parents`.
777struct DiffSummary {
778    files_changed: u32,
779    added_files: u32,
780    modified_files: u32,
781    deleted_files: u32,
782    added_lines: u32,
783    removed_lines: u32,
784    changed_paths: Vec<ChangedPath>,
785}
786
787/// Compute a summary diff for `state` vs its first parent. Errors
788/// from the object store propagate; missing trees / blobs are skipped
789/// silently (treated as zero-change for that path) so a partially
790/// pruned object store never blocks the review surface. The
791/// distinction matters: missing-object errors must become zero (the
792/// summary is best-effort, callers want a payload they can render),
793/// but genuine I/O errors must still propagate so a corrupt store
794/// surfaces loudly instead of silently truncating the review.
795fn compute_state_diff_summary(
796    repo: &Repository,
797    state: &State,
798) -> objects::error::Result<DiffSummary> {
799    use objects::object::Tree;
800    let parent_tree_hash = if let Some(parent_id) = state.parents.first() {
801        match repo.store().get_state(parent_id)? {
802            Some(parent_state) => parent_state.tree,
803            None => Tree::new().hash(),
804        }
805    } else {
806        Tree::new().hash()
807    };
808
809    // Resolve both tree objects up front so the missing-tree case
810    // becomes a synthesized empty changeset rather than an error from
811    // the recursive diff. `get_tree` returns `Ok(None)` for missing
812    // (not an error), and propagates only on genuine I/O — matching
813    // the policy the doc-comment claims.
814    let parent_tree_obj = repo.store().get_tree(&parent_tree_hash)?;
815    let new_tree_obj = repo.store().get_tree(&state.tree)?;
816
817    // If either tree is missing from the local store the diff is not
818    // meaningful — return an empty summary instead of erroring out.
819    // This mirrors the "Modified branch tolerates missing blobs" stance
820    // for the *tree* level: a partially pruned store should never block
821    // review payload retrieval, only render an empty summary.
822    let changes = if parent_tree_obj.is_some() && new_tree_obj.is_some() {
823        repo.diff_trees(&parent_tree_hash, &state.tree)?
824    } else {
825        objects::object::FileChangeSet::new()
826    };
827
828    // Compute per-file line deltas. We only count `Modified` here for
829    // the symmetric add/remove totals; `Added` files contribute every
830    // line as an add, and `Deleted` files contribute every line as a
831    // remove. Files with non-utf8 contents (e.g. binaries) silently
832    // contribute zero — `diff_blobs` already returns an empty vec in
833    // that case, and we mirror the same behavior for raw line counts.
834    let mut added_lines: u32 = 0;
835    let mut removed_lines: u32 = 0;
836    let mut changed_paths: Vec<ChangedPath> = Vec::with_capacity(changes.len());
837
838    let parent_files = match parent_tree_obj.as_ref() {
839        Some(t) => collect_files(repo, t, "")?,
840        None => std::collections::HashMap::new(),
841    };
842    let new_files = match new_tree_obj.as_ref() {
843        Some(t) => collect_files(repo, t, "")?,
844        None => std::collections::HashMap::new(),
845    };
846
847    let mut added_files: u32 = 0;
848    let mut modified_files: u32 = 0;
849    let mut deleted_files: u32 = 0;
850
851    for change in changes.iter() {
852        match change.kind {
853            DiffKind::Added => {
854                added_files += 1;
855                // Missing blob (`get_blob` returns `Ok(None)`) → file
856                // counts but contributes zero lines. Genuine I/O
857                // errors still propagate via `?` — same shape as the
858                // Modified branch's intent, but here we keep the
859                // distinction explicit so a corrupt store surfaces
860                // rather than getting silently swallowed.
861                if let Some(hash) = new_files.get(&change.path)
862                    && let Some(blob) = repo.store().get_blob(hash)?
863                {
864                    added_lines = added_lines.saturating_add(line_count(blob.content()));
865                }
866            }
867            DiffKind::Deleted => {
868                deleted_files += 1;
869                if let Some(hash) = parent_files.get(&change.path)
870                    && let Some(blob) = repo.store().get_blob(hash)?
871                {
872                    removed_lines = removed_lines.saturating_add(line_count(blob.content()));
873                }
874            }
875            DiffKind::Modified => {
876                modified_files += 1;
877                // `get_blob` already returns `Ok(None)` for a missing
878                // blob, so `?` here only fires on genuine I/O. Match
879                // the Added/Deleted branches' propagation policy
880                // explicitly instead of the older `.ok().flatten()`
881                // form, which silently swallowed IO errors and
882                // conflated them with "missing".
883                let old_blob = match parent_files.get(&change.path) {
884                    Some(h) => repo.store().get_blob(h)?,
885                    None => None,
886                };
887                let new_blob = match new_files.get(&change.path) {
888                    Some(h) => repo.store().get_blob(h)?,
889                    None => None,
890                };
891                if let (Some(old), Some(new)) = (old_blob, new_blob) {
892                    for line in diff_blobs(&old, &new) {
893                        match line {
894                            objects::worktree::DiffLine::Added(_) => {
895                                added_lines = added_lines.saturating_add(1);
896                            }
897                            objects::worktree::DiffLine::Removed(_) => {
898                                removed_lines = removed_lines.saturating_add(1);
899                            }
900                            objects::worktree::DiffLine::Context(_) => {}
901                        }
902                    }
903                }
904            }
905            DiffKind::Unchanged => continue,
906        }
907        changed_paths.push(ChangedPath {
908            path: change.path.clone(),
909            kind: change.kind,
910        });
911    }
912
913    Ok(DiffSummary {
914        files_changed: changed_paths.len() as u32,
915        added_files,
916        modified_files,
917        deleted_files,
918        added_lines,
919        removed_lines,
920        changed_paths,
921    })
922}
923
924/// Count the number of newline-separated lines in a file blob. Binary
925/// blobs (non-utf8) count as zero — we deliberately don't byte-count
926/// them, since "lines" is meaningless for binary content. A trailing
927/// newline does not introduce a phantom empty line.
928fn line_count(content: &[u8]) -> u32 {
929    let Ok(s) = std::str::from_utf8(content) else {
930        return 0;
931    };
932    if s.is_empty() {
933        return 0;
934    }
935    let trimmed = s.strip_suffix('\n').unwrap_or(s);
936    if trimmed.is_empty() {
937        return 1;
938    }
939    (trimmed.matches('\n').count() as u32).saturating_add(1)
940}
941
942// ---------------------------------------------------------------------------
943// Tests
944// ---------------------------------------------------------------------------
945
946#[cfg(test)]
947mod tests {
948    use std::sync::Arc;
949
950    use crypto::Signer as _;
951    use grpc::heddle::v1::ReviewScope as ProtoReviewScope;
952    use repo::{Repository, operation_dedup::OperationDedupStore};
953    use tempfile::TempDir;
954
955    use super::*;
956
957    fn fresh_service() -> (LocalStateReviewService, Arc<Repository>, TempDir) {
958        let temp = TempDir::new().expect("create tempdir");
959        // SAFETY: tests run with a controlled environment; setting these
960        // env vars steers the default attribution into a predictable place.
961        // SAFETY: setting env vars in tests; rust 2024 marks these unsafe.
962        unsafe {
963            std::env::set_var("HEDDLE_PRINCIPAL_NAME", "Alice Tester");
964            std::env::set_var("HEDDLE_PRINCIPAL_EMAIL", "alice@example.com");
965        }
966        let repo = Repository::init_default(temp.path()).expect("init repo");
967        let dedup = OperationDedupStore::open(repo.heddle_dir()).expect("open dedup");
968        let repo = Arc::new(repo);
969        let svc =
970            LocalStateReviewService::new(GrpcLocalService::new(repo.clone(), Arc::new(dedup)));
971        (svc, repo, temp)
972    }
973
974    fn capture_state(repo: &Repository) -> ChangeId {
975        // Write a tiny file so snapshot has something to capture.
976        std::fs::write(repo.root().join("hello.txt"), b"hi").expect("write file");
977        let state = repo
978            .snapshot(Some("seed".to_string()), None)
979            .expect("snapshot");
980        state.change_id
981    }
982
983    fn sign_request(state_id: &ChangeId, op_id: &str) -> SignStateRequest {
984        let signer = crypto::Ed25519Signer::generate().expect("generate ed25519 key");
985        let scope = ReviewScope::WholeChange;
986        let signed_at = chrono::Utc::now().timestamp();
987        let payload = signing_payload(*state_id, ReviewKind::Read, &scope, signed_at, None);
988        let signature = signer.sign(&payload).expect("sign payload");
989        use grpc::heddle::v1::review_scope::{Scope, WholeChange};
990        SignStateRequest {
991            repo_path: String::new(),
992            state_id: state_id.as_bytes().to_vec(),
993            kind: grpc::heddle::v1::ReviewKind::Read as i32,
994            scope: Some(ProtoReviewScope {
995                scope: Some(Scope::WholeChange(WholeChange {})),
996            }),
997            justification: String::new(),
998            algorithm: "ed25519".to_string(),
999            public_key: signer.public_key().to_vec(),
1000            signature: signature.clone(),
1001            signed_at: Some(prost_types::Timestamp {
1002                seconds: signed_at,
1003                nanos: 0,
1004            }),
1005            client_operation_id: op_id.to_string(),
1006        }
1007    }
1008
1009    #[tokio::test]
1010    #[serial_test::serial(principal_env)]
1011    async fn sign_state_persists_to_review_signatures_blob() {
1012        let (svc, repo, _tmp) = fresh_service();
1013        let state_id = capture_state(&repo);
1014
1015        let resp = svc
1016            .sign_state(Request::new(sign_request(&state_id, "")))
1017            .await
1018            .expect("sign_state");
1019        assert!(!resp.get_ref().signature_id.is_empty());
1020        assert_eq!(resp.get_ref().state_id, state_id.as_bytes().to_vec());
1021
1022        let listing = svc
1023            .list_signatures(Request::new(ListSignaturesRequest {
1024                repo_path: String::new(),
1025                state_id: state_id.as_bytes().to_vec(),
1026            }))
1027            .await
1028            .expect("list_signatures");
1029        let sigs = &listing.get_ref().signatures;
1030        assert_eq!(sigs.len(), 1, "expected one signature, got {sigs:?}");
1031        assert_eq!(sigs[0].kind, grpc::heddle::v1::ReviewKind::Read as i32);
1032        assert_eq!(sigs[0].algorithm, "ed25519");
1033        assert_eq!(sigs[0].actor_name, "Alice Tester");
1034        assert_eq!(sigs[0].actor_email, "alice@example.com");
1035        let scope_case = sigs[0].scope.as_ref().and_then(|s| s.scope.as_ref());
1036        assert!(matches!(
1037            scope_case,
1038            Some(grpc::heddle::v1::review_scope::Scope::WholeChange(_))
1039        ));
1040    }
1041
1042    #[tokio::test]
1043    #[serial_test::serial(principal_env)]
1044    async fn sign_state_idempotent() {
1045        let (svc, repo, _tmp) = fresh_service();
1046        let state_id = capture_state(&repo);
1047        let op_id = objects::object::OperationId::new().to_string();
1048        // The second call must replay the *same* request body — fresh
1049        // signatures hash differently, so we build once and clone.
1050        let req = sign_request(&state_id, &op_id);
1051
1052        let first = svc
1053            .sign_state(Request::new(req.clone()))
1054            .await
1055            .expect("first sign_state");
1056        let second = svc
1057            .sign_state(Request::new(req))
1058            .await
1059            .expect("second sign_state");
1060        assert_eq!(
1061            first.get_ref().signature_id,
1062            second.get_ref().signature_id,
1063            "replayed call must return the same signature_id"
1064        );
1065
1066        let listing = svc
1067            .list_signatures(Request::new(ListSignaturesRequest {
1068                repo_path: String::new(),
1069                state_id: state_id.as_bytes().to_vec(),
1070            }))
1071            .await
1072            .expect("list_signatures");
1073        assert_eq!(
1074            listing.get_ref().signatures.len(),
1075            1,
1076            "idempotent replay must not append a duplicate signature"
1077        );
1078    }
1079
1080    #[tokio::test]
1081    #[serial_test::serial(principal_env)]
1082    async fn sign_state_rejects_forged_signature() {
1083        let (svc, repo, _tmp) = fresh_service();
1084        let state_id = capture_state(&repo);
1085        let mut req = sign_request(&state_id, "");
1086        // Flip the last byte of the signature so verification fails.
1087        let last = req.signature.len() - 1;
1088        req.signature[last] ^= 0xff;
1089
1090        let err = svc
1091            .sign_state(Request::new(req))
1092            .await
1093            .expect_err("forged signature must be rejected");
1094        assert_eq!(err.code(), tonic::Code::InvalidArgument, "{err:?}");
1095        assert!(
1096            err.message().contains("failed verification"),
1097            "unexpected error message: {}",
1098            err.message()
1099        );
1100    }
1101
1102    #[tokio::test]
1103    #[serial_test::serial(principal_env)]
1104    async fn sign_state_rejects_skewed_timestamp() {
1105        let (svc, repo, _tmp) = fresh_service();
1106        let state_id = capture_state(&repo);
1107        let mut req = sign_request(&state_id, "");
1108        // Timestamp 1 hour in the future is well outside the skew window.
1109        if let Some(ts) = req.signed_at.as_mut() {
1110            ts.seconds += 60 * 60;
1111        }
1112
1113        let err = svc
1114            .sign_state(Request::new(req))
1115            .await
1116            .expect_err("skewed timestamp must be rejected");
1117        assert_eq!(err.code(), tonic::Code::InvalidArgument);
1118        assert!(err.message().contains("too far from server time"));
1119    }
1120
1121    #[tokio::test]
1122    #[serial_test::serial(principal_env)]
1123    async fn sign_state_attributes_to_caller_not_state_author() {
1124        // Regression for the codex-flagged bug: sign_state used to
1125        // attribute the signature to the state's author. Bob signing
1126        // Alice's state would record Alice. The signature must record
1127        // the *caller*. In local mode the caller is resolved via
1128        // `Repository::get_principal()` (env vars then config).
1129        let (svc, repo, _tmp) = fresh_service();
1130        // `fresh_service` already set the env to Alice; capture the
1131        // state under Alice's identity so `state.attribution.principal`
1132        // is Alice.
1133        let state_id = capture_state(&repo);
1134
1135        // Now switch the local user to Bob and sign Alice's state.
1136        // SAFETY: tests run with a controlled environment.
1137        unsafe {
1138            std::env::set_var("HEDDLE_PRINCIPAL_NAME", "Bob Signer");
1139            std::env::set_var("HEDDLE_PRINCIPAL_EMAIL", "bob@example.com");
1140        }
1141
1142        svc.sign_state(Request::new(sign_request(&state_id, "")))
1143            .await
1144            .expect("sign_state");
1145
1146        let listing = svc
1147            .list_signatures(Request::new(ListSignaturesRequest {
1148                repo_path: String::new(),
1149                state_id: state_id.as_bytes().to_vec(),
1150            }))
1151            .await
1152            .expect("list_signatures");
1153        let sigs = &listing.get_ref().signatures;
1154        assert_eq!(sigs.len(), 1);
1155        assert_eq!(
1156            sigs[0].actor_name, "Bob Signer",
1157            "signature must attribute to the caller (Bob), not the state author (Alice)"
1158        );
1159        assert_eq!(sigs[0].actor_email, "bob@example.com");
1160
1161        // Restore the env so other serial tests see the expected
1162        // Alice baseline.
1163        unsafe {
1164            std::env::set_var("HEDDLE_PRINCIPAL_NAME", "Alice Tester");
1165            std::env::set_var("HEDDLE_PRINCIPAL_EMAIL", "alice@example.com");
1166        }
1167    }
1168
1169    #[tokio::test]
1170    #[serial_test::serial(principal_env)]
1171    async fn sign_state_serializes_concurrent_appends() {
1172        // Regression for the codex-flagged race: two SignStates with
1173        // different operation ids could both read the same base
1174        // `review_signatures` blob, then the second `put_state` would
1175        // clobber the first signature. The fix wraps the
1176        // read-modify-write in `repo.locker().write()` and re-loads
1177        // the state inside the lock.
1178        let (svc, repo, _tmp) = fresh_service();
1179        let state_id = capture_state(&repo);
1180
1181        // Two distinct ops so neither replays the other through dedup.
1182        let op_a = objects::object::OperationId::new().to_string();
1183        let op_b = objects::object::OperationId::new().to_string();
1184        let req_a = sign_request(&state_id, &op_a);
1185        let req_b = sign_request(&state_id, &op_b);
1186
1187        let svc_a = svc.clone();
1188        let svc_b = svc.clone();
1189        let (a, b) = tokio::join!(
1190            svc_a.sign_state(Request::new(req_a)),
1191            svc_b.sign_state(Request::new(req_b)),
1192        );
1193        a.expect("first sign_state");
1194        b.expect("second sign_state");
1195
1196        let listing = svc
1197            .list_signatures(Request::new(ListSignaturesRequest {
1198                repo_path: String::new(),
1199                state_id: state_id.as_bytes().to_vec(),
1200            }))
1201            .await
1202            .expect("list_signatures");
1203        assert_eq!(
1204            listing.get_ref().signatures.len(),
1205            2,
1206            "both concurrent signatures must land — neither should be lost \
1207             to a stale-blob clobber"
1208        );
1209    }
1210
1211    /// Regression: `get_review_payload` previously returned
1212    /// `summary.files_changed = 0` and empty `in_budget_signals` for
1213    /// every state, even when the diff against the parent had real
1214    /// content. This test snapshots once (root state — every file is
1215    /// "added"), then snapshots again with a modification, and asserts
1216    /// both states report a non-empty diff summary plus a populated
1217    /// `diff_summary` signal anchored on the changed file.
1218    #[tokio::test]
1219    #[serial_test::serial(principal_env)]
1220    async fn get_review_payload_populates_diff_summary_and_signals() {
1221        let (svc, repo, _tmp) = fresh_service();
1222
1223        // First capture: root state, one file added. Every line of
1224        // `hello.txt` should count as added.
1225        std::fs::write(repo.root().join("hello.txt"), b"first\nsecond\nthird\n")
1226            .expect("write hello.txt");
1227        let first = repo
1228            .snapshot(Some("first capture".to_string()), None)
1229            .expect("first snapshot");
1230
1231        let resp_first = svc
1232            .get_review_payload(Request::new(GetReviewPayloadRequest {
1233                repo_path: String::new(),
1234                state_id: first.change_id.as_bytes().to_vec(),
1235                include_all_signals: false,
1236            }))
1237            .await
1238            .expect("get_review_payload first");
1239        let payload_first = resp_first.into_inner();
1240        let summary_first = payload_first.summary.as_ref().expect("summary present");
1241        assert!(
1242            summary_first.files_changed >= 1,
1243            "first state should report at least one file changed (vs empty parent), got {}",
1244            summary_first.files_changed
1245        );
1246        assert!(
1247            summary_first.added_lines >= 3,
1248            "first state should report 3+ added lines, got {}",
1249            summary_first.added_lines
1250        );
1251        assert!(
1252            !payload_first.in_budget_signals.is_empty(),
1253            "in_budget_signals must include a diff_summary entry"
1254        );
1255        let first_signal = &payload_first.in_budget_signals[0];
1256        assert!(
1257            first_signal.kind.starts_with("diff_summary."),
1258            "expected synthetic diff_summary signal kind, got {}",
1259            first_signal.kind
1260        );
1261        assert_eq!(first_signal.producer_module, "review_show.diff_summary");
1262        assert_eq!(first_signal.visibility, "visible");
1263
1264        // Second capture: modify the file. Diff vs the first state's
1265        // tree should report a single modified file with at least one
1266        // added and one removed line.
1267        std::fs::write(
1268            repo.root().join("hello.txt"),
1269            b"first\nsecond\nthird\nfourth\n",
1270        )
1271        .expect("modify hello.txt");
1272        let second = repo
1273            .snapshot(Some("second capture".to_string()), None)
1274            .expect("second snapshot");
1275
1276        let resp_second = svc
1277            .get_review_payload(Request::new(GetReviewPayloadRequest {
1278                repo_path: String::new(),
1279                state_id: second.change_id.as_bytes().to_vec(),
1280                include_all_signals: false,
1281            }))
1282            .await
1283            .expect("get_review_payload second");
1284        let payload_second = resp_second.into_inner();
1285        let summary_second = payload_second.summary.as_ref().expect("summary present");
1286        assert_eq!(
1287            summary_second.files_changed, 1,
1288            "second state should report exactly one modified file"
1289        );
1290        assert!(
1291            summary_second.added_lines >= 1,
1292            "second state should report at least one added line, got {}",
1293            summary_second.added_lines
1294        );
1295        assert!(
1296            !payload_second.in_budget_signals.is_empty(),
1297            "in_budget_signals must include a diff_summary entry"
1298        );
1299        let signal = &payload_second.in_budget_signals[0];
1300        assert_eq!(
1301            signal
1302                .anchor
1303                .as_ref()
1304                .map(|a| a.file.as_str())
1305                .unwrap_or(""),
1306            "hello.txt",
1307            "diff_summary signal should anchor on the modified file"
1308        );
1309        assert!(
1310            signal.reason.contains("files changed"),
1311            "first signal reason should carry the aggregate summary, got {}",
1312            signal.reason
1313        );
1314        // The summary's signal-count fields should reflect the visible
1315        // budget so consumers can short-circuit on empty-vs-populated
1316        // without re-counting the array.
1317        assert_eq!(
1318            summary_second.in_budget_signal_count,
1319            payload_second.in_budget_signals.len() as u32,
1320            "in_budget_signal_count must match the array length"
1321        );
1322    }
1323
1324    /// Regression for codex feedback on PRs #52 (tree fallback) and
1325    /// #56 (blob fallback): `compute_state_diff_summary` must tolerate
1326    /// missing trees and blobs by returning an empty/partial summary
1327    /// rather than blocking the entire review payload. Construct a
1328    /// state whose `tree` points to a content hash that was never
1329    /// stored — `get_tree` returns `Ok(None)`, and the function must
1330    /// fall back to an empty changeset instead of erroring out of
1331    /// `diff_trees`. (Pre-fix, the Modified branch already tolerated
1332    /// missing blobs via `.ok().flatten()`, but the Added/Deleted
1333    /// branches and the top-level diff_trees call did not — surfaces
1334    /// of pruned object stores all errored out inconsistently.)
1335    #[tokio::test]
1336    #[serial_test::serial(principal_env)]
1337    async fn get_review_payload_tolerates_missing_tree() {
1338        let (svc, repo, _tmp) = fresh_service();
1339        let state_id = capture_state(&repo);
1340
1341        // Load the state, swap its tree pointer to a synthetic hash
1342        // that nothing references, and re-persist. The state object
1343        // itself survives, but `repo.store().get_tree(state.tree)`
1344        // will return `Ok(None)` — the missing-tree case.
1345        let state = repo
1346            .store()
1347            .get_state(&state_id)
1348            .expect("get state")
1349            .expect("state present");
1350        let bogus_tree = objects::object::ContentHash::compute(b"definitely-not-in-store-bytes");
1351        let mut mutated = state.clone();
1352        mutated.tree = bogus_tree;
1353        repo.store().put_state(&mutated).expect("put mutated state");
1354
1355        // The review payload must still come back — empty summary,
1356        // but no Status::Internal error from the gRPC layer.
1357        let resp = svc
1358            .get_review_payload(Request::new(GetReviewPayloadRequest {
1359                repo_path: String::new(),
1360                state_id: state_id.as_bytes().to_vec(),
1361                include_all_signals: false,
1362            }))
1363            .await
1364            .expect("missing tree must not block review payload");
1365        let payload = resp.into_inner();
1366        let summary = payload.summary.as_ref().expect("summary present");
1367        assert_eq!(
1368            summary.files_changed, 0,
1369            "missing tree must produce a zero-change summary, got {} files",
1370            summary.files_changed
1371        );
1372        // Synthetic diff_summary signal should still be present (with
1373        // the `empty` kind) so consumers always see at least one
1374        // signal — keeps the wire shape stable.
1375        assert!(
1376            !payload.in_budget_signals.is_empty(),
1377            "in_budget_signals should always contain at least the synthetic diff_summary entry"
1378        );
1379        let kind = &payload.in_budget_signals[0].kind;
1380        assert!(
1381            kind.starts_with("diff_summary."),
1382            "expected synthetic diff_summary signal, got {kind}"
1383        );
1384    }
1385
1386    /// `line_count` should match git-style line counts — trailing
1387    /// newline never produces a phantom empty line, but an unterminated
1388    /// final line still counts.
1389    #[test]
1390    fn line_count_matches_git_semantics() {
1391        assert_eq!(line_count(b""), 0);
1392        assert_eq!(line_count(b"\n"), 1);
1393        assert_eq!(line_count(b"hello"), 1);
1394        assert_eq!(line_count(b"hello\n"), 1);
1395        assert_eq!(line_count(b"hello\nworld"), 2);
1396        assert_eq!(line_count(b"hello\nworld\n"), 2);
1397        assert_eq!(line_count(b"a\nb\nc\n"), 3);
1398        // Non-utf8 bytes count as zero (treated as binary).
1399        assert_eq!(line_count(&[0xff, 0xfe, 0xfd]), 0);
1400    }
1401}