Skip to main content

daemon/grpc_local_impl/
state_review.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Local-mode implementation of the [`StateReviewService`] gRPC contract.
3//!
4//! Reads / writes the [`ReviewSignaturesBlob`] persisted at
5//! [`State::review_signatures`]. Verifies the client-supplied signature
6//! against the deterministic [`signing_payload`] before persisting.
7
8// `::state_review` disambiguates from this module's own name
9// (`grpc_local_impl::state_review`), the same way the hosted impl
10// disambiguates by being in a sibling module.
11use ::state_review::{
12    PathSymbol, ReadingOrderPartition, SymbolKind, build_review_payload_partition,
13};
14use crypto::verify_payload_signature;
15use grpc::heddle::v1::{
16    AnchoredDiscussion, GetReviewPayloadRequest, ListSignaturesRequest, ListSignaturesResponse,
17    MergeRequirement, PathSymbolRef as ProtoPathSymbolRef,
18    ReadingOrderPartition as ProtoReadingOrderPartition, ReviewPayload,
19    ReviewScope as ProtoReviewScope, ReviewSignature as ProtoReviewSignature, ReviewSummary,
20    RiskSignal as ProtoRiskSignal, SignStateRequest, SignStateResponse,
21    SignalAnchor as ProtoSignalAnchor, SigningFooter,
22    state_review_service_server::StateReviewService,
23};
24use objects::{
25    lock::RepositoryLockExt,
26    object::{
27        Blob, ChangeId, DiffKind, Discussion, DiscussionResolution, DiscussionsBlob, ReviewKind,
28        ReviewScope, ReviewSignature, ReviewSignaturesBlob, RiskSignalBlob, State, SymbolAnchor,
29        signing_payload,
30    },
31    store::ObjectStore,
32    worktree::diff_blobs,
33};
34use prost::Message;
35use repo::Repository;
36use tonic::{Request, Response, Status};
37
38use super::{GrpcLocalService, to_status, with_idempotency};
39
40/// Maximum drift (seconds) between the client's `signed_at_unix` and the
41/// server's wall clock. Generous enough to absorb NTP skew, narrow enough
42/// to bound the window for replay-style attacks.
43const SIGN_TIMESTAMP_SKEW_SECS: i64 = 5 * 60;
44
45/// Local-mode `StateReviewService` implementation.
46#[derive(Clone)]
47pub struct LocalStateReviewService {
48    inner: GrpcLocalService,
49}
50
51impl LocalStateReviewService {
52    pub fn new(inner: GrpcLocalService) -> Self {
53        Self { inner }
54    }
55}
56
57#[tonic::async_trait]
58impl StateReviewService for LocalStateReviewService {
59    async fn get_review_payload(
60        &self,
61        request: Request<GetReviewPayloadRequest>,
62    ) -> Result<Response<ReviewPayload>, Status> {
63        let req = request.into_inner();
64        let change_id = parse_change_id(&req.state_id)?;
65        let repo = self.inner.repo();
66        let state = repo
67            .store()
68            .get_state(&change_id)
69            .map_err(to_status)?
70            .ok_or_else(|| {
71                Status::not_found(format!("state {} not found", change_id.to_string_full()))
72            })?;
73
74        // Diff the state's tree against its first parent so the summary
75        // counts reflect what actually changed in this state. The
76        // signal registry / budgeter will eventually layer on top of
77        // this; until then `files_changed` is the most useful single
78        // number an agent can use for self-review.
79        let diff_summary = compute_state_diff_summary(repo, &state).map_err(to_status)?;
80
81        let summary = ReviewSummary {
82            headline: state.intent.clone().unwrap_or_default(),
83            files_changed: diff_summary.files_changed,
84            added_lines: diff_summary.added_lines,
85            removed_lines: diff_summary.removed_lines,
86            in_budget_signal_count: 0,
87            hidden_signal_count: 0,
88        };
89
90        let agent_narrative = if state.attribution.agent.is_some() {
91            state.intent.clone().unwrap_or_default()
92        } else {
93            String::new()
94        };
95
96        // Surface fired risk signals if requested. The signal registry will
97        // replace this with a proper budget split; until then everything we
98        // read is "visible".
99        let mut all_signals: Vec<ProtoRiskSignal> = Vec::new();
100        if req.include_all_signals
101            && let Some(hash) = state.risk_signals
102            && let Some(blob) = repo.store().get_blob(&hash).map_err(to_status)?
103        {
104            let decoded = RiskSignalBlob::decode(blob.content())
105                .map_err(|err| Status::internal(format!("decode risk signals: {err}")))?;
106            all_signals = decoded
107                .signals
108                .into_iter()
109                .map(|s| risk_signal_to_proto(s, "visible"))
110                .collect();
111        }
112
113        // Synthesize a structured `diff_summary` signal so the
114        // `in_budget_signals` array is non-empty even before the real
115        // signal registry is wired up. Anchored on each modified
116        // file (capped) so consumers can iterate without losing the
117        // summary aggregate. This is a deliberate stable shape: agents
118        // already iterating signals get a usable record per file
119        // change, and the registry-driven path will simply layer real
120        // signals alongside it.
121        let mut in_budget_signals: Vec<ProtoRiskSignal> = Vec::new();
122        let summary_kind = match (
123            diff_summary.added_files,
124            diff_summary.modified_files,
125            diff_summary.deleted_files,
126        ) {
127            (a, 0, 0) if a > 0 => "diff_summary.added_only",
128            (0, m, 0) if m > 0 => "diff_summary.modified_only",
129            (0, 0, d) if d > 0 => "diff_summary.deleted_only",
130            (0, 0, 0) => "diff_summary.empty",
131            _ => "diff_summary.mixed",
132        };
133        let summary_reason = format!(
134            "{} files changed (+{}/-{}, {} added, {} modified, {} deleted)",
135            diff_summary.files_changed,
136            diff_summary.added_lines,
137            diff_summary.removed_lines,
138            diff_summary.added_files,
139            diff_summary.modified_files,
140            diff_summary.deleted_files,
141        );
142        // Per-file anchors keep the array reasoning-friendly when
143        // many files change, but cap so very large diffs don't bloat
144        // the payload. The aggregate summary always rides on the
145        // first entry's reason field; the rest carry per-file deltas.
146        const MAX_DIFF_SIGNAL_ANCHORS: usize = 32;
147        if diff_summary.changed_paths.is_empty() {
148            in_budget_signals.push(ProtoRiskSignal {
149                kind: summary_kind.to_string(),
150                anchor: Some(ProtoSignalAnchor {
151                    file: String::new(),
152                    symbol: String::new(),
153                    start_line: 0,
154                    end_line: 0,
155                }),
156                reason: summary_reason.clone(),
157                producer_module: "review_show.diff_summary".to_string(),
158                producer_version: 1,
159                computed_at: None,
160                visibility: "visible".to_string(),
161            });
162        } else {
163            for (idx, path_kind) in diff_summary
164                .changed_paths
165                .iter()
166                .take(MAX_DIFF_SIGNAL_ANCHORS)
167                .enumerate()
168            {
169                let reason = if idx == 0 {
170                    summary_reason.clone()
171                } else {
172                    format!("{} ({})", path_kind.path, path_kind.kind_str())
173                };
174                in_budget_signals.push(ProtoRiskSignal {
175                    kind: summary_kind.to_string(),
176                    anchor: Some(ProtoSignalAnchor {
177                        file: path_kind.path.clone(),
178                        symbol: String::new(),
179                        start_line: 0,
180                        end_line: 0,
181                    }),
182                    reason,
183                    producer_module: "review_show.diff_summary".to_string(),
184                    producer_version: 1,
185                    computed_at: None,
186                    visibility: "visible".to_string(),
187                });
188            }
189        }
190
191        // Server-side reading-order partition. Same per-symbol
192        // extraction logic as the hosted handler: tree-sitter when the
193        // `semantic` feature is enabled, path-only fallback otherwise.
194        let symbols = changed_files_as_symbols(repo, &state, &diff_summary.changed_paths)
195            .map_err(to_status)?;
196        let partition = build_review_payload_partition(&symbols);
197
198        // Project the state's `DiscussionsBlob` (when present)
199        // into the wire-shape `AnchoredDiscussion` list.
200        let discussions = match state.discussions {
201            Some(hash) => {
202                let blob = repo
203                    .store()
204                    .get_blob(&hash)
205                    .map_err(to_status)?
206                    .ok_or_else(|| {
207                        Status::internal(format!(
208                            "discussions blob {} referenced by state {} is missing",
209                            hash,
210                            state.change_id.to_string_full()
211                        ))
212                    })?;
213                let decoded = DiscussionsBlob::decode(blob.content())
214                    .map_err(|err| Status::internal(format!("decode discussions: {err}")))?;
215                decoded
216                    .discussions
217                    .iter()
218                    .map(discussion_to_anchored_proto)
219                    .collect()
220            }
221            None => Vec::<AnchoredDiscussion>::new(),
222        };
223
224        let mut summary = summary;
225        summary.in_budget_signal_count = in_budget_signals.len() as u32;
226        summary.hidden_signal_count =
227            all_signals.len().saturating_sub(in_budget_signals.len()) as u32;
228
229        let payload = ReviewPayload {
230            state_id: req.state_id.clone(),
231            summary: Some(summary),
232            agent_narrative,
233            partition: Some(partition_to_proto(partition)),
234            in_budget_signals,
235            all_signals,
236            tick_budget: 3,
237            discussions,
238            // Local mode has no policy registry — merge requirements
239            // are surfaced only by the hosted handler.
240            merge_requirements: Vec::<MergeRequirement>::new(),
241            signing_footer: Some(SigningFooter {
242                available_kinds: vec![
243                    grpc::heddle::v1::ReviewKind::Read as i32,
244                    grpc::heddle::v1::ReviewKind::AgentPreview as i32,
245                    grpc::heddle::v1::ReviewKind::AgentCoReview as i32,
246                ],
247            }),
248        };
249
250        Ok(Response::new(payload))
251    }
252
253    async fn sign_state(
254        &self,
255        request: Request<SignStateRequest>,
256    ) -> Result<Response<SignStateResponse>, Status> {
257        let req = request.into_inner();
258        let req_bytes = req.encode_to_vec();
259        let client_operation_id = req.client_operation_id.clone();
260        let inner = self.inner.clone();
261
262        let response = with_idempotency(
263            &self.inner,
264            &client_operation_id,
265            "state_review.sign_state",
266            &req_bytes,
267            move || {
268                let inner = inner.clone();
269                async move { execute_sign_state(&inner, req).await }
270            },
271        )
272        .await?;
273
274        Ok(Response::new(response))
275    }
276
277    async fn list_signatures(
278        &self,
279        request: Request<ListSignaturesRequest>,
280    ) -> Result<Response<ListSignaturesResponse>, Status> {
281        let req = request.into_inner();
282        let change_id = parse_change_id(&req.state_id)?;
283        let repo = self.inner.repo();
284        let state = repo
285            .store()
286            .get_state(&change_id)
287            .map_err(to_status)?
288            .ok_or_else(|| {
289                Status::not_found(format!("state {} not found", change_id.to_string_full()))
290            })?;
291
292        let signatures = match state.review_signatures {
293            Some(hash) => {
294                let blob = repo
295                    .store()
296                    .get_blob(&hash)
297                    .map_err(to_status)?
298                    .ok_or_else(|| {
299                        Status::internal(format!(
300                            "review signatures blob {} missing from object store",
301                            hash
302                        ))
303                    })?;
304                let decoded = ReviewSignaturesBlob::decode(blob.content())
305                    .map_err(|err| Status::internal(format!("decode review signatures: {err}")))?;
306                decoded
307                    .signatures
308                    .into_iter()
309                    .enumerate()
310                    .map(|(idx, sig)| review_signature_to_proto(sig, synthetic_signature_id(idx)))
311                    .collect()
312            }
313            None => Vec::new(),
314        };
315
316        Ok(Response::new(ListSignaturesResponse { signatures }))
317    }
318}
319
320/// Body of [`LocalStateReviewService::sign_state`]. Lifted out of the trait
321/// method so [`with_idempotency`] can re-execute it inside its closure.
322async fn execute_sign_state(
323    inner: &GrpcLocalService,
324    req: SignStateRequest,
325) -> Result<SignStateResponse, Status> {
326    // 1. Validate the kind.
327    let kind = match grpc::heddle::v1::ReviewKind::try_from(req.kind)
328        .map_err(|_| Status::invalid_argument(format!("unknown review kind tag {}", req.kind)))?
329    {
330        grpc::heddle::v1::ReviewKind::Read => ReviewKind::Read,
331        grpc::heddle::v1::ReviewKind::AgentPreview => ReviewKind::AgentPreview,
332        grpc::heddle::v1::ReviewKind::AgentCoReview => ReviewKind::AgentCoReview,
333        grpc::heddle::v1::ReviewKind::Unspecified => {
334            return Err(Status::invalid_argument("review kind is required"));
335        }
336    };
337
338    // 2. Locate the state.
339    let change_id = parse_change_id(&req.state_id)?;
340    let repo = inner.repo();
341
342    // 3. Translate the scope.
343    let scope = match req.scope.as_ref() {
344        Some(s) => proto_scope_to_object(s)?,
345        None => ReviewScope::WholeChange,
346    };
347
348    // 4. Build the ReviewSignature, then verify the client-supplied
349    // signature is well-formed and matches the deterministic signing
350    // payload. A malformed or forged signature must never reach the
351    // persisted blob. Attribute the signature to the local-mode
352    // caller (`Repository::get_principal` resolves env vars then
353    // `[principal]` in `.heddle/config.toml`), not the state's author
354    // — Bob signing Alice's state should record Bob.
355    let actor = repo
356        .get_principal()
357        .map_err(|err| Status::internal(format!("resolve caller principal: {err}")))?;
358    let justification = if req.justification.is_empty() {
359        None
360    } else {
361        Some(req.justification.clone())
362    };
363
364    let now = chrono::Utc::now().timestamp();
365    let signed_at = req.signed_at.as_ref().map(|t| t.seconds).unwrap_or(0);
366    if signed_at == 0 {
367        return Err(Status::invalid_argument(
368            "signed_at is required and must match the timestamp the client signed over",
369        ));
370    }
371    if (signed_at - now).abs() > SIGN_TIMESTAMP_SKEW_SECS {
372        return Err(Status::invalid_argument(format!(
373            "signed_at={signed_at} is too far from server time={now} (max skew {SIGN_TIMESTAMP_SKEW_SECS}s)"
374        )));
375    }
376
377    let new_sig = ReviewSignature {
378        actor,
379        kind,
380        scope: scope.clone(),
381        justification: justification.clone(),
382        signed_at,
383        algorithm: req.algorithm.clone(),
384        public_key: hex::encode(&req.public_key),
385        signature: hex::encode(&req.signature),
386    };
387    new_sig
388        .validate()
389        .map_err(|err| Status::invalid_argument(format!("invalid review signature: {err}")))?;
390
391    let public_key_bytes = req.public_key.clone();
392    let signature_bytes = req.signature.clone();
393    let payload = signing_payload(change_id, kind, &scope, signed_at, justification.as_deref());
394    verify_payload_signature(
395        &payload,
396        &req.algorithm,
397        &public_key_bytes,
398        &signature_bytes,
399    )
400    .map_err(|err| {
401        Status::invalid_argument(format!(
402            "review signature failed verification ({}): {err}",
403            req.algorithm
404        ))
405    })?;
406
407    // 5. Serialize the read-modify-write on `review_signatures`
408    // behind the repo write-lock and re-load the state inside the
409    // critical section. Two concurrent SignStates with different
410    // operation ids would otherwise both read the same base blob and
411    // the second `put_state` would clobber the first signature.
412    let _lock = repo
413        .locker()
414        .write()
415        .map_err(|err| Status::internal(err.to_string()))?;
416    let state = repo
417        .store()
418        .get_state(&change_id)
419        .map_err(to_status)?
420        .ok_or_else(|| {
421            Status::not_found(format!("state {} not found", change_id.to_string_full()))
422        })?;
423    let mut blob = match state.review_signatures {
424        Some(hash) => {
425            let raw = repo
426                .store()
427                .get_blob(&hash)
428                .map_err(to_status)?
429                .ok_or_else(|| {
430                    Status::internal(format!(
431                        "existing review signatures blob {} missing from object store",
432                        hash
433                    ))
434                })?;
435            ReviewSignaturesBlob::decode(raw.content())
436                .map_err(|err| Status::internal(format!("decode review signatures: {err}")))?
437        }
438        None => ReviewSignaturesBlob::new(Vec::new()),
439    };
440    blob.signatures.push(new_sig);
441    let new_index = blob.signatures.len() - 1;
442
443    // 6. Persist the new blob.
444    let bytes = blob
445        .encode()
446        .map_err(|err| Status::internal(format!("encode review signatures: {err}")))?;
447    let content_hash = repo
448        .store()
449        .put_blob(&Blob::new(bytes))
450        .map_err(to_status)?;
451
452    // 7. Persist the updated state.
453    let new_state = state.with_review_signatures(content_hash);
454    repo.store().put_state(&new_state).map_err(to_status)?;
455
456    Ok(SignStateResponse {
457        signature_id: synthetic_signature_id(new_index),
458        state_id: req.state_id,
459    })
460}
461
462/// `ReviewSignature` doesn't carry an explicit id; we synthesise one from
463/// the per-state index so the wire surface has stable signature ids within a
464/// single state. (A future schema bump may add an explicit id.)
465fn synthetic_signature_id(index: usize) -> String {
466    format!("rs-{index}")
467}
468
469fn parse_change_id(s: &[u8]) -> Result<ChangeId, Status> {
470    ChangeId::try_from_slice(s)
471        .map_err(|err| Status::invalid_argument(format!("invalid state_id: {err}")))
472}
473
474fn proto_scope_to_object(scope: &ProtoReviewScope) -> Result<ReviewScope, Status> {
475    use grpc::heddle::v1::review_scope::Scope;
476    match scope.scope.as_ref() {
477        None | Some(Scope::WholeChange(_)) => Ok(ReviewScope::WholeChange),
478        Some(Scope::Symbols(list)) => {
479            if list.symbols.is_empty() {
480                return Err(Status::invalid_argument(
481                    "symbols scope requires at least one symbol anchor",
482                ));
483            }
484            let symbols = list
485                .symbols
486                .iter()
487                .map(|s| SymbolAnchor::new(s.file.clone(), s.symbol.clone()))
488                .collect();
489            Ok(ReviewScope::Symbols(symbols))
490        }
491    }
492}
493
494fn object_scope_to_proto(scope: &ReviewScope) -> ProtoReviewScope {
495    use grpc::heddle::v1::review_scope::{Scope, SymbolList, WholeChange};
496    let inner = match scope {
497        ReviewScope::WholeChange => Scope::WholeChange(WholeChange {}),
498        ReviewScope::Symbols(symbols) => Scope::Symbols(SymbolList {
499            symbols: symbols
500                .iter()
501                .map(|s| ProtoPathSymbolRef {
502                    file: s.file.clone(),
503                    symbol: s.symbol.clone(),
504                })
505                .collect(),
506        }),
507    };
508    ProtoReviewScope { scope: Some(inner) }
509}
510
511fn review_signature_to_proto(sig: ReviewSignature, signature_id: String) -> ProtoReviewSignature {
512    ProtoReviewSignature {
513        signature_id,
514        actor_name: sig.actor.name.clone(),
515        actor_email: sig.actor.email.clone(),
516        kind: review_kind_to_proto(sig.kind) as i32,
517        scope: Some(object_scope_to_proto(&sig.scope)),
518        justification: sig.justification.unwrap_or_default(),
519        signed_at: Some(prost_types::Timestamp {
520            seconds: sig.signed_at,
521            nanos: 0,
522        }),
523        algorithm: sig.algorithm,
524        public_key: hex::decode(&sig.public_key).unwrap_or_default(),
525        signature: hex::decode(&sig.signature).unwrap_or_default(),
526    }
527}
528
529fn review_kind_to_proto(kind: ReviewKind) -> grpc::heddle::v1::ReviewKind {
530    match kind {
531        ReviewKind::Read => grpc::heddle::v1::ReviewKind::Read,
532        ReviewKind::AgentPreview => grpc::heddle::v1::ReviewKind::AgentPreview,
533        ReviewKind::AgentCoReview => grpc::heddle::v1::ReviewKind::AgentCoReview,
534    }
535}
536
537fn risk_signal_to_proto(sig: objects::object::RiskSignal, visibility: &str) -> ProtoRiskSignal {
538    let (start_line, end_line) = sig.anchor.line_range.unwrap_or((0, 0));
539    ProtoRiskSignal {
540        kind: sig.kind.as_str().to_string(),
541        anchor: Some(ProtoSignalAnchor {
542            file: sig.anchor.file,
543            symbol: sig.anchor.symbol.unwrap_or_default(),
544            start_line,
545            end_line,
546        }),
547        reason: sig.reason,
548        producer_module: sig.producer.module,
549        producer_version: sig.producer.version,
550        computed_at: Some(prost_types::Timestamp {
551            seconds: sig.computed_at,
552            nanos: 0,
553        }),
554        visibility: visibility.to_string(),
555    }
556}
557
558// ---------------------------------------------------------------------------
559// Symbol extraction + discussion projection (mirrors hosted impl).
560// ---------------------------------------------------------------------------
561
562/// Symbol projection for the reading-order partition. Mirrors the hosted-handler
563/// implementation: when the `semantic` feature is enabled and the
564/// changed path has a tree-sitter parser and a readable new-side blob, emits
565/// one [`PathSymbol`] per definition. Otherwise falls back to a single path-only
566/// entry (kind = `Other`), which keeps deletes and gitlink pointer changes
567/// visible even though they do not carry Heddle blob content.
568fn changed_files_as_symbols(
569    repo: &Repository,
570    state: &State,
571    changed_paths: &[ChangedPath],
572) -> objects::error::Result<Vec<PathSymbol>> {
573    let new_tree = match repo.store().get_tree(&state.tree)? {
574        Some(t) => t,
575        None => return Ok(Vec::new()),
576    };
577    let new_files = collect_files(repo, &new_tree, "")?;
578
579    let mut out: Vec<PathSymbol> = Vec::new();
580    for path_kind in changed_paths {
581        let path = &path_kind.path;
582        #[cfg_attr(not(feature = "semantic"), allow(unused_mut))]
583        let mut emitted_any = false;
584        if let Some(hash) = new_files.get(path) {
585            #[cfg(feature = "semantic")]
586            {
587                if let Some(blob) = repo.store().get_blob(hash)? {
588                    emitted_any = extract_file_symbols(path, blob.content(), &mut out);
589                }
590            }
591            #[cfg(not(feature = "semantic"))]
592            {
593                let _ = hash;
594            }
595        }
596        if !emitted_any {
597            out.push(PathSymbol {
598                file: path.clone(),
599                symbol: path.clone(),
600                kind: SymbolKind::Other,
601            });
602        }
603    }
604    Ok(out)
605}
606
607#[cfg(feature = "semantic")]
608fn extract_file_symbols(path: &str, source: &[u8], out: &mut Vec<PathSymbol>) -> bool {
609    use ::semantic::symbol_resolver::{Definition, DefinitionKind, extract_definitions};
610    let definitions: Vec<Definition> = match extract_definitions(source, std::path::Path::new(path))
611    {
612        Ok(defs) => defs,
613        Err(_) => return false,
614    };
615    if definitions.is_empty() {
616        return false;
617    }
618    for d in definitions {
619        let kind = match d.kind {
620            DefinitionKind::Type => SymbolKind::Type,
621            DefinitionKind::Trait => SymbolKind::Trait,
622            DefinitionKind::Class => SymbolKind::Class,
623            DefinitionKind::Interface => SymbolKind::Interface,
624            DefinitionKind::TypeAlias => SymbolKind::TypeAlias,
625            DefinitionKind::EnumDef => SymbolKind::EnumDef,
626            DefinitionKind::ConstDecl => SymbolKind::ConstDecl,
627            DefinitionKind::Module => SymbolKind::Module,
628            DefinitionKind::Function => SymbolKind::Function,
629            DefinitionKind::Other => SymbolKind::Other,
630        };
631        let symbol = match d.parent_name.as_deref() {
632            Some(parent) if !parent.is_empty() => format!("{parent}::{}", d.name),
633            _ => d.name,
634        };
635        out.push(PathSymbol {
636            file: path.to_string(),
637            symbol,
638            kind,
639        });
640    }
641    true
642}
643
644fn collect_files(
645    repo: &Repository,
646    tree: &objects::object::Tree,
647    prefix: &str,
648) -> objects::error::Result<std::collections::HashMap<String, objects::object::ContentHash>> {
649    let mut out = std::collections::HashMap::new();
650    for entry in tree.entries() {
651        let path = if prefix.is_empty() {
652            entry.name().to_string()
653        } else {
654            format!("{prefix}/{}", entry.name())
655        };
656        if entry.is_tree() {
657            if let Some(hash) = entry.tree_hash()
658                && let Some(subtree) = repo.store().get_tree(&hash)?
659            {
660                let sub = collect_files(repo, &subtree, &path)?;
661                out.extend(sub);
662            }
663        } else if let Some(hash) = entry.content_hash() {
664            out.insert(path, hash);
665        }
666    }
667    Ok(out)
668}
669
670fn partition_to_proto(p: ReadingOrderPartition) -> ProtoReadingOrderPartition {
671    ProtoReadingOrderPartition {
672        structural: p.structural.iter().map(path_symbol_to_proto).collect(),
673        consequence: p.consequence.iter().map(path_symbol_to_proto).collect(),
674        tests_and_docs: p.tests_and_docs.iter().map(path_symbol_to_proto).collect(),
675    }
676}
677
678fn path_symbol_to_proto(p: &PathSymbol) -> ProtoPathSymbolRef {
679    ProtoPathSymbolRef {
680        file: p.file.clone(),
681        symbol: p.symbol.clone(),
682    }
683}
684
685fn discussion_to_anchored_proto(d: &Discussion) -> AnchoredDiscussion {
686    AnchoredDiscussion {
687        id: d.id.clone(),
688        anchor: Some(ProtoPathSymbolRef {
689            file: d.anchor.file.clone(),
690            symbol: d.anchor.symbol.clone(),
691        }),
692        opened_against_state: d.opened_against_state.as_bytes().to_vec(),
693        opened_at: Some(prost_types::Timestamp {
694            seconds: d.opened_at,
695            nanos: 0,
696        }),
697        turns: d
698            .turns
699            .iter()
700            .map(|t| grpc::heddle::v1::DiscussionTurn {
701                author_name: t.author.name.clone(),
702                author_email: t.author.email.clone(),
703                body: t.body.clone(),
704                posted_at: Some(prost_types::Timestamp {
705                    seconds: t.posted_at,
706                    nanos: 0,
707                }),
708            })
709            .collect(),
710        resolution: Some(discussion_resolution_to_proto(&d.resolution)),
711        body_changed_since_open: d.body_changed_since_open,
712        orphaned: d.orphaned,
713        visibility: d.visibility.as_str().to_string(),
714    }
715}
716
717fn discussion_resolution_to_proto(
718    resolution: &DiscussionResolution,
719) -> grpc::heddle::v1::DiscussionResolution {
720    use grpc::heddle::v1::discussion_resolution::{
721        Dismissed, Open, ResolvedByEdit, ResolvedIntoAnnotation, State,
722    };
723    let state = match resolution {
724        DiscussionResolution::Open => State::Open(Open {}),
725        DiscussionResolution::ResolvedIntoAnnotation { annotation_id } => {
726            State::IntoAnnotation(ResolvedIntoAnnotation {
727                annotation_id: annotation_id.clone(),
728            })
729        }
730        DiscussionResolution::ResolvedByEdit { state_id } => State::ByEdit(ResolvedByEdit {
731            state_id: state_id.as_bytes().to_vec(),
732        }),
733        DiscussionResolution::Dismissed { reason } => State::Dismissed(Dismissed {
734            reason: reason.clone(),
735        }),
736    };
737    grpc::heddle::v1::DiscussionResolution { state: Some(state) }
738}
739
740// ---------------------------------------------------------------------------
741// Diff summary helpers (state.tree vs first parent's tree).
742// ---------------------------------------------------------------------------
743
744/// File-change kinds we surface in the diff summary signal anchors.
745/// Mirrors `objects::object::DiffKind` minus the `Unchanged` variant
746/// (we filter those out before constructing this).
747#[derive(Debug, Clone)]
748struct ChangedPath {
749    path: String,
750    kind: DiffKind,
751}
752
753impl ChangedPath {
754    fn kind_str(&self) -> &'static str {
755        match self.kind {
756            DiffKind::Added => "added",
757            DiffKind::Modified => "modified",
758            DiffKind::Deleted => "deleted",
759            DiffKind::Unchanged => "unchanged",
760        }
761    }
762}
763
764/// Aggregated counts plus a path list, computed by diffing
765/// `state.tree` against the first parent's tree (or empty when the
766/// state is a root). When `state.parents` is empty every file in the
767/// state's tree counts as added, which makes "first capture" reviews
768/// non-empty too. The `_state` prefix on `_state` is intentional: the
769/// helper currently only reads `state.tree` and `state.parents`.
770struct DiffSummary {
771    files_changed: u32,
772    added_files: u32,
773    modified_files: u32,
774    deleted_files: u32,
775    added_lines: u32,
776    removed_lines: u32,
777    changed_paths: Vec<ChangedPath>,
778}
779
780/// Compute a summary diff for `state` vs its first parent. Errors
781/// from the object store propagate; missing trees / blobs are skipped
782/// silently (treated as zero-change for that path) so a partially
783/// pruned object store never blocks the review surface. The
784/// distinction matters: missing-object errors must become zero (the
785/// summary is best-effort, callers want a payload they can render),
786/// but genuine I/O errors must still propagate so a corrupt store
787/// surfaces loudly instead of silently truncating the review.
788fn compute_state_diff_summary(
789    repo: &Repository,
790    state: &State,
791) -> objects::error::Result<DiffSummary> {
792    use objects::object::Tree;
793    let parent_tree_hash = if let Some(parent_id) = state.parents.first() {
794        match repo.store().get_state(parent_id)? {
795            Some(parent_state) => parent_state.tree,
796            None => Tree::new().hash(),
797        }
798    } else {
799        Tree::new().hash()
800    };
801
802    // Resolve both tree objects up front so the missing-tree case
803    // becomes a synthesized empty changeset rather than an error from
804    // the recursive diff. `get_tree` returns `Ok(None)` for missing
805    // (not an error), and propagates only on genuine I/O — matching
806    // the policy the doc-comment claims.
807    let parent_tree_obj = repo.store().get_tree(&parent_tree_hash)?;
808    let new_tree_obj = repo.store().get_tree(&state.tree)?;
809
810    // If either tree is missing from the local store the diff is not
811    // meaningful — return an empty summary instead of erroring out.
812    // This mirrors the "Modified branch tolerates missing blobs" stance
813    // for the *tree* level: a partially pruned store should never block
814    // review payload retrieval, only render an empty summary.
815    let changes = if parent_tree_obj.is_some() && new_tree_obj.is_some() {
816        repo.diff_trees(&parent_tree_hash, &state.tree)?
817    } else {
818        objects::object::FileChangeSet::new()
819    };
820
821    // Compute per-file line deltas. We only count `Modified` here for
822    // the symmetric add/remove totals; `Added` files contribute every
823    // line as an add, and `Deleted` files contribute every line as a
824    // remove. Files with non-utf8 contents (e.g. binaries) silently
825    // contribute zero — `diff_blobs` already returns an empty vec in
826    // that case, and we mirror the same behavior for raw line counts.
827    let mut added_lines: u32 = 0;
828    let mut removed_lines: u32 = 0;
829    let mut changed_paths: Vec<ChangedPath> = Vec::with_capacity(changes.len());
830
831    let parent_files = match parent_tree_obj.as_ref() {
832        Some(t) => collect_files(repo, t, "")?,
833        None => std::collections::HashMap::new(),
834    };
835    let new_files = match new_tree_obj.as_ref() {
836        Some(t) => collect_files(repo, t, "")?,
837        None => std::collections::HashMap::new(),
838    };
839
840    let mut added_files: u32 = 0;
841    let mut modified_files: u32 = 0;
842    let mut deleted_files: u32 = 0;
843
844    for change in changes.iter() {
845        match change.kind {
846            DiffKind::Added => {
847                added_files += 1;
848                // Missing blob (`get_blob` returns `Ok(None)`) → file
849                // counts but contributes zero lines. Genuine I/O
850                // errors still propagate via `?` — same shape as the
851                // Modified branch's intent, but here we keep the
852                // distinction explicit so a corrupt store surfaces
853                // rather than getting silently swallowed.
854                if let Some(hash) = new_files.get(&change.path)
855                    && let Some(blob) = repo.store().get_blob(hash)?
856                {
857                    added_lines = added_lines.saturating_add(line_count(blob.content()));
858                }
859            }
860            DiffKind::Deleted => {
861                deleted_files += 1;
862                if let Some(hash) = parent_files.get(&change.path)
863                    && let Some(blob) = repo.store().get_blob(hash)?
864                {
865                    removed_lines = removed_lines.saturating_add(line_count(blob.content()));
866                }
867            }
868            DiffKind::Modified => {
869                modified_files += 1;
870                // `get_blob` already returns `Ok(None)` for a missing
871                // blob, so `?` here only fires on genuine I/O. Match
872                // the Added/Deleted branches' propagation policy
873                // explicitly instead of the older `.ok().flatten()`
874                // form, which silently swallowed IO errors and
875                // conflated them with "missing".
876                let old_blob = match parent_files.get(&change.path) {
877                    Some(h) => repo.store().get_blob(h)?,
878                    None => None,
879                };
880                let new_blob = match new_files.get(&change.path) {
881                    Some(h) => repo.store().get_blob(h)?,
882                    None => None,
883                };
884                if let (Some(old), Some(new)) = (old_blob, new_blob) {
885                    for line in diff_blobs(&old, &new) {
886                        match line {
887                            objects::worktree::DiffLine::Added(_) => {
888                                added_lines = added_lines.saturating_add(1);
889                            }
890                            objects::worktree::DiffLine::Removed(_) => {
891                                removed_lines = removed_lines.saturating_add(1);
892                            }
893                            objects::worktree::DiffLine::Context(_) => {}
894                        }
895                    }
896                }
897            }
898            DiffKind::Unchanged => continue,
899        }
900        changed_paths.push(ChangedPath {
901            path: change.path.clone(),
902            kind: change.kind,
903        });
904    }
905
906    Ok(DiffSummary {
907        files_changed: changed_paths.len() as u32,
908        added_files,
909        modified_files,
910        deleted_files,
911        added_lines,
912        removed_lines,
913        changed_paths,
914    })
915}
916
917/// Count the number of newline-separated lines in a file blob. Binary
918/// blobs (non-utf8) count as zero — we deliberately don't byte-count
919/// them, since "lines" is meaningless for binary content. A trailing
920/// newline does not introduce a phantom empty line.
921fn line_count(content: &[u8]) -> u32 {
922    let Ok(s) = std::str::from_utf8(content) else {
923        return 0;
924    };
925    if s.is_empty() {
926        return 0;
927    }
928    let trimmed = s.strip_suffix('\n').unwrap_or(s);
929    if trimmed.is_empty() {
930        return 1;
931    }
932    (trimmed.matches('\n').count() as u32).saturating_add(1)
933}
934
935// ---------------------------------------------------------------------------
936// Tests
937// ---------------------------------------------------------------------------
938
939#[cfg(test)]
940mod tests {
941    use std::sync::Arc;
942
943    use crypto::Signer as _;
944    use grpc::heddle::v1::ReviewScope as ProtoReviewScope;
945    use repo::{Repository, operation_dedup::OperationDedupStore};
946    use tempfile::TempDir;
947
948    use super::*;
949
950    fn fresh_service() -> (LocalStateReviewService, Arc<Repository>, TempDir) {
951        let temp = TempDir::new().expect("create tempdir");
952        // SAFETY: tests run with a controlled environment; setting these
953        // env vars steers the default attribution into a predictable place.
954        // SAFETY: setting env vars in tests; rust 2024 marks these unsafe.
955        unsafe {
956            std::env::set_var("HEDDLE_PRINCIPAL_NAME", "Alice Tester");
957            std::env::set_var("HEDDLE_PRINCIPAL_EMAIL", "alice@example.com");
958        }
959        let repo = Repository::init_default(temp.path()).expect("init repo");
960        let dedup = OperationDedupStore::open(repo.heddle_dir()).expect("open dedup");
961        let repo = Arc::new(repo);
962        let svc =
963            LocalStateReviewService::new(GrpcLocalService::new(repo.clone(), Arc::new(dedup)));
964        (svc, repo, temp)
965    }
966
967    fn capture_state(repo: &Repository) -> ChangeId {
968        // Write a tiny file so snapshot has something to capture.
969        std::fs::write(repo.root().join("hello.txt"), b"hi").expect("write file");
970        let state = repo
971            .snapshot(Some("seed".to_string()), None)
972            .expect("snapshot");
973        state.change_id
974    }
975
976    fn sign_request(state_id: &ChangeId, op_id: &str) -> SignStateRequest {
977        let signer = crypto::Ed25519Signer::generate().expect("generate ed25519 key");
978        let scope = ReviewScope::WholeChange;
979        let signed_at = chrono::Utc::now().timestamp();
980        let payload = signing_payload(*state_id, ReviewKind::Read, &scope, signed_at, None);
981        let signature = signer.sign(&payload).expect("sign payload");
982        use grpc::heddle::v1::review_scope::{Scope, WholeChange};
983        SignStateRequest {
984            repo_path: String::new(),
985            state_id: state_id.as_bytes().to_vec(),
986            kind: grpc::heddle::v1::ReviewKind::Read as i32,
987            scope: Some(ProtoReviewScope {
988                scope: Some(Scope::WholeChange(WholeChange {})),
989            }),
990            justification: String::new(),
991            algorithm: "ed25519".to_string(),
992            public_key: signer.public_key().to_vec(),
993            signature: signature.clone(),
994            signed_at: Some(prost_types::Timestamp {
995                seconds: signed_at,
996                nanos: 0,
997            }),
998            client_operation_id: op_id.to_string(),
999        }
1000    }
1001
1002    #[tokio::test]
1003    #[serial_test::serial(process_global)]
1004    async fn sign_state_persists_to_review_signatures_blob() {
1005        let (svc, repo, _tmp) = fresh_service();
1006        let state_id = capture_state(&repo);
1007
1008        let resp = svc
1009            .sign_state(Request::new(sign_request(&state_id, "")))
1010            .await
1011            .expect("sign_state");
1012        assert!(!resp.get_ref().signature_id.is_empty());
1013        assert_eq!(resp.get_ref().state_id, state_id.as_bytes().to_vec());
1014
1015        let listing = svc
1016            .list_signatures(Request::new(ListSignaturesRequest {
1017                repo_path: String::new(),
1018                state_id: state_id.as_bytes().to_vec(),
1019            }))
1020            .await
1021            .expect("list_signatures");
1022        let sigs = &listing.get_ref().signatures;
1023        assert_eq!(sigs.len(), 1, "expected one signature, got {sigs:?}");
1024        assert_eq!(sigs[0].kind, grpc::heddle::v1::ReviewKind::Read as i32);
1025        assert_eq!(sigs[0].algorithm, "ed25519");
1026        assert_eq!(sigs[0].actor_name, "Alice Tester");
1027        assert_eq!(sigs[0].actor_email, "alice@example.com");
1028        let scope_case = sigs[0].scope.as_ref().and_then(|s| s.scope.as_ref());
1029        assert!(matches!(
1030            scope_case,
1031            Some(grpc::heddle::v1::review_scope::Scope::WholeChange(_))
1032        ));
1033    }
1034
1035    #[tokio::test]
1036    #[serial_test::serial(process_global)]
1037    async fn sign_state_idempotent() {
1038        let (svc, repo, _tmp) = fresh_service();
1039        let state_id = capture_state(&repo);
1040        let op_id = objects::object::OperationId::new().to_string();
1041        // The second call must replay the *same* request body — fresh
1042        // signatures hash differently, so we build once and clone.
1043        let req = sign_request(&state_id, &op_id);
1044
1045        let first = svc
1046            .sign_state(Request::new(req.clone()))
1047            .await
1048            .expect("first sign_state");
1049        let second = svc
1050            .sign_state(Request::new(req))
1051            .await
1052            .expect("second sign_state");
1053        assert_eq!(
1054            first.get_ref().signature_id,
1055            second.get_ref().signature_id,
1056            "replayed call must return the same signature_id"
1057        );
1058
1059        let listing = svc
1060            .list_signatures(Request::new(ListSignaturesRequest {
1061                repo_path: String::new(),
1062                state_id: state_id.as_bytes().to_vec(),
1063            }))
1064            .await
1065            .expect("list_signatures");
1066        assert_eq!(
1067            listing.get_ref().signatures.len(),
1068            1,
1069            "idempotent replay must not append a duplicate signature"
1070        );
1071    }
1072
1073    #[tokio::test]
1074    #[serial_test::serial(process_global)]
1075    async fn sign_state_rejects_forged_signature() {
1076        let (svc, repo, _tmp) = fresh_service();
1077        let state_id = capture_state(&repo);
1078        let mut req = sign_request(&state_id, "");
1079        // Flip the last byte of the signature so verification fails.
1080        let last = req.signature.len() - 1;
1081        req.signature[last] ^= 0xff;
1082
1083        let err = svc
1084            .sign_state(Request::new(req))
1085            .await
1086            .expect_err("forged signature must be rejected");
1087        assert_eq!(err.code(), tonic::Code::InvalidArgument, "{err:?}");
1088        assert!(
1089            err.message().contains("failed verification"),
1090            "unexpected error message: {}",
1091            err.message()
1092        );
1093    }
1094
1095    #[tokio::test]
1096    #[serial_test::serial(process_global)]
1097    async fn sign_state_rejects_skewed_timestamp() {
1098        let (svc, repo, _tmp) = fresh_service();
1099        let state_id = capture_state(&repo);
1100        let mut req = sign_request(&state_id, "");
1101        // Timestamp 1 hour in the future is well outside the skew window.
1102        if let Some(ts) = req.signed_at.as_mut() {
1103            ts.seconds += 60 * 60;
1104        }
1105
1106        let err = svc
1107            .sign_state(Request::new(req))
1108            .await
1109            .expect_err("skewed timestamp must be rejected");
1110        assert_eq!(err.code(), tonic::Code::InvalidArgument);
1111        assert!(err.message().contains("too far from server time"));
1112    }
1113
1114    #[tokio::test]
1115    #[serial_test::serial(process_global)]
1116    async fn sign_state_attributes_to_caller_not_state_author() {
1117        // Regression for the codex-flagged bug: sign_state used to
1118        // attribute the signature to the state's author. Bob signing
1119        // Alice's state would record Alice. The signature must record
1120        // the *caller*. In local mode the caller is resolved via
1121        // `Repository::get_principal()` (env vars then config).
1122        let (svc, repo, _tmp) = fresh_service();
1123        // `fresh_service` already set the env to Alice; capture the
1124        // state under Alice's identity so `state.attribution.principal`
1125        // is Alice.
1126        let state_id = capture_state(&repo);
1127
1128        // Now switch the local user to Bob and sign Alice's state.
1129        // SAFETY: tests run with a controlled environment.
1130        unsafe {
1131            std::env::set_var("HEDDLE_PRINCIPAL_NAME", "Bob Signer");
1132            std::env::set_var("HEDDLE_PRINCIPAL_EMAIL", "bob@example.com");
1133        }
1134
1135        svc.sign_state(Request::new(sign_request(&state_id, "")))
1136            .await
1137            .expect("sign_state");
1138
1139        let listing = svc
1140            .list_signatures(Request::new(ListSignaturesRequest {
1141                repo_path: String::new(),
1142                state_id: state_id.as_bytes().to_vec(),
1143            }))
1144            .await
1145            .expect("list_signatures");
1146        let sigs = &listing.get_ref().signatures;
1147        assert_eq!(sigs.len(), 1);
1148        assert_eq!(
1149            sigs[0].actor_name, "Bob Signer",
1150            "signature must attribute to the caller (Bob), not the state author (Alice)"
1151        );
1152        assert_eq!(sigs[0].actor_email, "bob@example.com");
1153
1154        // Restore the env so other serial tests see the expected
1155        // Alice baseline.
1156        unsafe {
1157            std::env::set_var("HEDDLE_PRINCIPAL_NAME", "Alice Tester");
1158            std::env::set_var("HEDDLE_PRINCIPAL_EMAIL", "alice@example.com");
1159        }
1160    }
1161
1162    #[tokio::test]
1163    #[serial_test::serial(process_global)]
1164    async fn sign_state_serializes_concurrent_appends() {
1165        // Regression for the codex-flagged race: two SignStates with
1166        // different operation ids could both read the same base
1167        // `review_signatures` blob, then the second `put_state` would
1168        // clobber the first signature. The fix wraps the
1169        // read-modify-write in `repo.locker().write()` and re-loads
1170        // the state inside the lock.
1171        let (svc, repo, _tmp) = fresh_service();
1172        let state_id = capture_state(&repo);
1173
1174        // Two distinct ops so neither replays the other through dedup.
1175        let op_a = objects::object::OperationId::new().to_string();
1176        let op_b = objects::object::OperationId::new().to_string();
1177        let req_a = sign_request(&state_id, &op_a);
1178        let req_b = sign_request(&state_id, &op_b);
1179
1180        let svc_a = svc.clone();
1181        let svc_b = svc.clone();
1182        let (a, b) = tokio::join!(
1183            svc_a.sign_state(Request::new(req_a)),
1184            svc_b.sign_state(Request::new(req_b)),
1185        );
1186        a.expect("first sign_state");
1187        b.expect("second sign_state");
1188
1189        let listing = svc
1190            .list_signatures(Request::new(ListSignaturesRequest {
1191                repo_path: String::new(),
1192                state_id: state_id.as_bytes().to_vec(),
1193            }))
1194            .await
1195            .expect("list_signatures");
1196        assert_eq!(
1197            listing.get_ref().signatures.len(),
1198            2,
1199            "both concurrent signatures must land — neither should be lost \
1200             to a stale-blob clobber"
1201        );
1202    }
1203
1204    /// Regression: `get_review_payload` previously returned
1205    /// `summary.files_changed = 0` and empty `in_budget_signals` for
1206    /// every state, even when the diff against the parent had real
1207    /// content. This test snapshots once (root state — every file is
1208    /// "added"), then snapshots again with a modification, and asserts
1209    /// both states report a non-empty diff summary plus a populated
1210    /// `diff_summary` signal anchored on the changed file.
1211    #[tokio::test]
1212    #[serial_test::serial(process_global)]
1213    async fn get_review_payload_populates_diff_summary_and_signals() {
1214        let (svc, repo, _tmp) = fresh_service();
1215
1216        // First capture: root state, one file added. Every line of
1217        // `hello.txt` should count as added.
1218        std::fs::write(repo.root().join("hello.txt"), b"first\nsecond\nthird\n")
1219            .expect("write hello.txt");
1220        let first = repo
1221            .snapshot(Some("first capture".to_string()), None)
1222            .expect("first snapshot");
1223
1224        let resp_first = svc
1225            .get_review_payload(Request::new(GetReviewPayloadRequest {
1226                repo_path: String::new(),
1227                state_id: first.change_id.as_bytes().to_vec(),
1228                include_all_signals: false,
1229            }))
1230            .await
1231            .expect("get_review_payload first");
1232        let payload_first = resp_first.into_inner();
1233        let summary_first = payload_first.summary.as_ref().expect("summary present");
1234        assert!(
1235            summary_first.files_changed >= 1,
1236            "first state should report at least one file changed (vs empty parent), got {}",
1237            summary_first.files_changed
1238        );
1239        assert!(
1240            summary_first.added_lines >= 3,
1241            "first state should report 3+ added lines, got {}",
1242            summary_first.added_lines
1243        );
1244        assert!(
1245            !payload_first.in_budget_signals.is_empty(),
1246            "in_budget_signals must include a diff_summary entry"
1247        );
1248        let first_signal = &payload_first.in_budget_signals[0];
1249        assert!(
1250            first_signal.kind.starts_with("diff_summary."),
1251            "expected synthetic diff_summary signal kind, got {}",
1252            first_signal.kind
1253        );
1254        assert_eq!(first_signal.producer_module, "review_show.diff_summary");
1255        assert_eq!(first_signal.visibility, "visible");
1256
1257        // Second capture: modify the file. Diff vs the first state's
1258        // tree should report a single modified file with at least one
1259        // added and one removed line.
1260        std::fs::write(
1261            repo.root().join("hello.txt"),
1262            b"first\nsecond\nthird\nfourth\n",
1263        )
1264        .expect("modify hello.txt");
1265        let second = repo
1266            .snapshot(Some("second capture".to_string()), None)
1267            .expect("second snapshot");
1268
1269        let resp_second = svc
1270            .get_review_payload(Request::new(GetReviewPayloadRequest {
1271                repo_path: String::new(),
1272                state_id: second.change_id.as_bytes().to_vec(),
1273                include_all_signals: false,
1274            }))
1275            .await
1276            .expect("get_review_payload second");
1277        let payload_second = resp_second.into_inner();
1278        let summary_second = payload_second.summary.as_ref().expect("summary present");
1279        assert_eq!(
1280            summary_second.files_changed, 1,
1281            "second state should report exactly one modified file"
1282        );
1283        assert!(
1284            summary_second.added_lines >= 1,
1285            "second state should report at least one added line, got {}",
1286            summary_second.added_lines
1287        );
1288        assert!(
1289            !payload_second.in_budget_signals.is_empty(),
1290            "in_budget_signals must include a diff_summary entry"
1291        );
1292        let signal = &payload_second.in_budget_signals[0];
1293        assert_eq!(
1294            signal
1295                .anchor
1296                .as_ref()
1297                .map(|a| a.file.as_str())
1298                .unwrap_or(""),
1299            "hello.txt",
1300            "diff_summary signal should anchor on the modified file"
1301        );
1302        assert!(
1303            signal.reason.contains("files changed"),
1304            "first signal reason should carry the aggregate summary, got {}",
1305            signal.reason
1306        );
1307        // The summary's signal-count fields should reflect the visible
1308        // budget so consumers can short-circuit on empty-vs-populated
1309        // without re-counting the array.
1310        assert_eq!(
1311            summary_second.in_budget_signal_count,
1312            payload_second.in_budget_signals.len() as u32,
1313            "in_budget_signal_count must match the array length"
1314        );
1315    }
1316
1317    #[tokio::test]
1318    #[serial_test::serial(process_global)]
1319    async fn get_review_payload_surfaces_gitlink_target_changes() {
1320        let (svc, repo, _tmp) = fresh_service();
1321
1322        let old_target = "0303030303030303030303030303030303030303"
1323            .parse()
1324            .expect("old git oid");
1325        let new_target = "0404040404040404040404040404040404040404"
1326            .parse()
1327            .expect("new git oid");
1328        let old_tree = objects::object::Tree::from_entries(vec![
1329            objects::object::TreeEntry::gitlink("vendor", old_target).expect("old gitlink"),
1330        ]);
1331        let new_tree = objects::object::Tree::from_entries(vec![
1332            objects::object::TreeEntry::gitlink("vendor", new_target).expect("new gitlink"),
1333        ]);
1334        let old_tree_hash = repo.store().put_tree(&old_tree).expect("put old tree");
1335        let new_tree_hash = repo.store().put_tree(&new_tree).expect("put new tree");
1336        let base = State::new_snapshot(
1337            old_tree_hash,
1338            Vec::new(),
1339            objects::object::Attribution::human(objects::object::Principal::new(
1340                "Gitlink Reviewer",
1341                "gitlink@example.test",
1342            )),
1343        );
1344        let base_id = base.change_id;
1345        repo.store().put_state(&base).expect("put base state");
1346        let changed = State::new_snapshot(
1347            new_tree_hash,
1348            vec![base_id],
1349            objects::object::Attribution::human(objects::object::Principal::new(
1350                "Gitlink Reviewer",
1351                "gitlink@example.test",
1352            )),
1353        );
1354        let changed_id = changed.change_id;
1355        repo.store().put_state(&changed).expect("put changed state");
1356
1357        let resp = svc
1358            .get_review_payload(Request::new(GetReviewPayloadRequest {
1359                repo_path: String::new(),
1360                state_id: changed_id.as_bytes().to_vec(),
1361                include_all_signals: false,
1362            }))
1363            .await
1364            .expect("get_review_payload gitlink change");
1365        let payload = resp.into_inner();
1366        let summary = payload.summary.as_ref().expect("summary present");
1367        assert_eq!(
1368            summary.files_changed, 1,
1369            "gitlink pointer change should count as one changed path"
1370        );
1371        assert_eq!(summary.added_lines, 0);
1372        assert_eq!(summary.removed_lines, 0);
1373        assert_eq!(
1374            payload
1375                .in_budget_signals
1376                .first()
1377                .and_then(|signal| signal.anchor.as_ref())
1378                .map(|anchor| anchor.file.as_str()),
1379            Some("vendor"),
1380            "diff_summary signal should be anchored on the gitlink path"
1381        );
1382        let partition = payload.partition.expect("partition present");
1383        let surfaced = partition
1384            .structural
1385            .iter()
1386            .chain(partition.consequence.iter())
1387            .chain(partition.tests_and_docs.iter())
1388            .any(|symbol| symbol.file == "vendor" && symbol.symbol == "vendor");
1389        assert!(surfaced, "gitlink change should be path-visible in review");
1390    }
1391
1392    /// Regression for codex feedback on PRs #52 (tree fallback) and
1393    /// #56 (blob fallback): `compute_state_diff_summary` must tolerate
1394    /// missing trees and blobs by returning an empty/partial summary
1395    /// rather than blocking the entire review payload. Construct a
1396    /// state whose `tree` points to a content hash that was never
1397    /// stored — `get_tree` returns `Ok(None)`, and the function must
1398    /// fall back to an empty changeset instead of erroring out of
1399    /// `diff_trees`. (Pre-fix, the Modified branch already tolerated
1400    /// missing blobs via `.ok().flatten()`, but the Added/Deleted
1401    /// branches and the top-level diff_trees call did not — surfaces
1402    /// of pruned object stores all errored out inconsistently.)
1403    #[tokio::test]
1404    #[serial_test::serial(process_global)]
1405    async fn get_review_payload_tolerates_missing_tree() {
1406        let (svc, repo, _tmp) = fresh_service();
1407        let state_id = capture_state(&repo);
1408
1409        // Load the state, swap its tree pointer to a synthetic hash
1410        // that nothing references, and re-persist. The state object
1411        // itself survives, but `repo.store().get_tree(state.tree)`
1412        // will return `Ok(None)` — the missing-tree case.
1413        let state = repo
1414            .store()
1415            .get_state(&state_id)
1416            .expect("get state")
1417            .expect("state present");
1418        let bogus_tree = objects::object::ContentHash::compute(b"definitely-not-in-store-bytes");
1419        let mut mutated = state.clone();
1420        mutated.tree = bogus_tree;
1421        repo.store().put_state(&mutated).expect("put mutated state");
1422
1423        // The review payload must still come back — empty summary,
1424        // but no Status::Internal error from the gRPC layer.
1425        let resp = svc
1426            .get_review_payload(Request::new(GetReviewPayloadRequest {
1427                repo_path: String::new(),
1428                state_id: state_id.as_bytes().to_vec(),
1429                include_all_signals: false,
1430            }))
1431            .await
1432            .expect("missing tree must not block review payload");
1433        let payload = resp.into_inner();
1434        let summary = payload.summary.as_ref().expect("summary present");
1435        assert_eq!(
1436            summary.files_changed, 0,
1437            "missing tree must produce a zero-change summary, got {} files",
1438            summary.files_changed
1439        );
1440        // Synthetic diff_summary signal should still be present (with
1441        // the `empty` kind) so consumers always see at least one
1442        // signal — keeps the wire shape stable.
1443        assert!(
1444            !payload.in_budget_signals.is_empty(),
1445            "in_budget_signals should always contain at least the synthetic diff_summary entry"
1446        );
1447        let kind = &payload.in_budget_signals[0].kind;
1448        assert!(
1449            kind.starts_with("diff_summary."),
1450            "expected synthetic diff_summary signal, got {kind}"
1451        );
1452    }
1453
1454    /// `line_count` should match git-style line counts — trailing
1455    /// newline never produces a phantom empty line, but an unterminated
1456    /// final line still counts.
1457    #[test]
1458    fn line_count_matches_git_semantics() {
1459        assert_eq!(line_count(b""), 0);
1460        assert_eq!(line_count(b"\n"), 1);
1461        assert_eq!(line_count(b"hello"), 1);
1462        assert_eq!(line_count(b"hello\n"), 1);
1463        assert_eq!(line_count(b"hello\nworld"), 2);
1464        assert_eq!(line_count(b"hello\nworld\n"), 2);
1465        assert_eq!(line_count(b"a\nb\nc\n"), 3);
1466        // Non-utf8 bytes count as zero (treated as binary).
1467        assert_eq!(line_count(&[0xff, 0xfe, 0xfd]), 0);
1468    }
1469}