1use ::state_review::{
12 PathSymbol, ReadingOrderPartition, SymbolKind, build_review_payload_partition,
13};
14use crypto::verify_payload_signature;
15use grpc::heddle::v1::{
16 AnchoredDiscussion, GetReviewPayloadRequest, ListSignaturesRequest, ListSignaturesResponse,
17 MergeRequirement, PathSymbolRef as ProtoPathSymbolRef,
18 ReadingOrderPartition as ProtoReadingOrderPartition, ReviewPayload,
19 ReviewScope as ProtoReviewScope, ReviewSignature as ProtoReviewSignature, ReviewSummary,
20 RiskSignal as ProtoRiskSignal, SignStateRequest, SignStateResponse,
21 SignalAnchor as ProtoSignalAnchor, SigningFooter,
22 state_review_service_server::StateReviewService,
23};
24use objects::{
25 lock::RepositoryLockExt,
26 object::{
27 Blob, ChangeId, DiffKind, Discussion, DiscussionResolution, DiscussionsBlob, ReviewKind,
28 ReviewScope, ReviewSignature, ReviewSignaturesBlob, RiskSignalBlob, State, SymbolAnchor,
29 signing_payload,
30 },
31 store::ObjectStore,
32 worktree::diff_blobs,
33};
34use prost::Message;
35use repo::Repository;
36use tonic::{Request, Response, Status};
37
38use super::{GrpcLocalService, to_status, with_idempotency};
39
40const SIGN_TIMESTAMP_SKEW_SECS: i64 = 5 * 60;
44
45#[derive(Clone)]
47pub struct LocalStateReviewService {
48 inner: GrpcLocalService,
49}
50
51impl LocalStateReviewService {
52 pub fn new(inner: GrpcLocalService) -> Self {
53 Self { inner }
54 }
55}
56
57#[tonic::async_trait]
58impl StateReviewService for LocalStateReviewService {
59 async fn get_review_payload(
60 &self,
61 request: Request<GetReviewPayloadRequest>,
62 ) -> Result<Response<ReviewPayload>, Status> {
63 let req = request.into_inner();
64 let change_id = parse_change_id(&req.state_id)?;
65 let repo = self.inner.repo();
66 let state = repo
67 .store()
68 .get_state(&change_id)
69 .map_err(to_status)?
70 .ok_or_else(|| {
71 Status::not_found(format!("state {} not found", change_id.to_string_full()))
72 })?;
73
74 let diff_summary = compute_state_diff_summary(repo, &state).map_err(to_status)?;
80
81 let summary = ReviewSummary {
82 headline: state.intent.clone().unwrap_or_default(),
83 files_changed: diff_summary.files_changed,
84 added_lines: diff_summary.added_lines,
85 removed_lines: diff_summary.removed_lines,
86 in_budget_signal_count: 0,
87 hidden_signal_count: 0,
88 };
89
90 let agent_narrative = if state.attribution.agent.is_some() {
91 state.intent.clone().unwrap_or_default()
92 } else {
93 String::new()
94 };
95
96 let mut all_signals: Vec<ProtoRiskSignal> = Vec::new();
100 if req.include_all_signals
101 && let Some(hash) = state.risk_signals
102 && let Some(blob) = repo.store().get_blob(&hash).map_err(to_status)?
103 {
104 let decoded = RiskSignalBlob::decode(blob.content())
105 .map_err(|err| Status::internal(format!("decode risk signals: {err}")))?;
106 all_signals = decoded
107 .signals
108 .into_iter()
109 .map(|s| risk_signal_to_proto(s, "visible"))
110 .collect();
111 }
112
113 let mut in_budget_signals: Vec<ProtoRiskSignal> = Vec::new();
122 let summary_kind = match (
123 diff_summary.added_files,
124 diff_summary.modified_files,
125 diff_summary.deleted_files,
126 ) {
127 (a, 0, 0) if a > 0 => "diff_summary.added_only",
128 (0, m, 0) if m > 0 => "diff_summary.modified_only",
129 (0, 0, d) if d > 0 => "diff_summary.deleted_only",
130 (0, 0, 0) => "diff_summary.empty",
131 _ => "diff_summary.mixed",
132 };
133 let summary_reason = format!(
134 "{} files changed (+{}/-{}, {} added, {} modified, {} deleted)",
135 diff_summary.files_changed,
136 diff_summary.added_lines,
137 diff_summary.removed_lines,
138 diff_summary.added_files,
139 diff_summary.modified_files,
140 diff_summary.deleted_files,
141 );
142 const MAX_DIFF_SIGNAL_ANCHORS: usize = 32;
147 if diff_summary.changed_paths.is_empty() {
148 in_budget_signals.push(ProtoRiskSignal {
149 kind: summary_kind.to_string(),
150 anchor: Some(ProtoSignalAnchor {
151 file: String::new(),
152 symbol: String::new(),
153 start_line: 0,
154 end_line: 0,
155 }),
156 reason: summary_reason.clone(),
157 producer_module: "review_show.diff_summary".to_string(),
158 producer_version: 1,
159 computed_at: None,
160 visibility: "visible".to_string(),
161 });
162 } else {
163 for (idx, path_kind) in diff_summary
164 .changed_paths
165 .iter()
166 .take(MAX_DIFF_SIGNAL_ANCHORS)
167 .enumerate()
168 {
169 let reason = if idx == 0 {
170 summary_reason.clone()
171 } else {
172 format!("{} ({})", path_kind.path, path_kind.kind_str())
173 };
174 in_budget_signals.push(ProtoRiskSignal {
175 kind: summary_kind.to_string(),
176 anchor: Some(ProtoSignalAnchor {
177 file: path_kind.path.clone(),
178 symbol: String::new(),
179 start_line: 0,
180 end_line: 0,
181 }),
182 reason,
183 producer_module: "review_show.diff_summary".to_string(),
184 producer_version: 1,
185 computed_at: None,
186 visibility: "visible".to_string(),
187 });
188 }
189 }
190
191 let symbols = changed_files_as_symbols(repo, &state, &diff_summary.changed_paths)
195 .map_err(to_status)?;
196 let partition = build_review_payload_partition(&symbols);
197
198 let discussions = match state.discussions {
201 Some(hash) => {
202 let blob = repo
203 .store()
204 .get_blob(&hash)
205 .map_err(to_status)?
206 .ok_or_else(|| {
207 Status::internal(format!(
208 "discussions blob {} referenced by state {} is missing",
209 hash,
210 state.change_id.to_string_full()
211 ))
212 })?;
213 let decoded = DiscussionsBlob::decode(blob.content())
214 .map_err(|err| Status::internal(format!("decode discussions: {err}")))?;
215 decoded
216 .discussions
217 .iter()
218 .map(discussion_to_anchored_proto)
219 .collect()
220 }
221 None => Vec::<AnchoredDiscussion>::new(),
222 };
223
224 let mut summary = summary;
225 summary.in_budget_signal_count = in_budget_signals.len() as u32;
226 summary.hidden_signal_count =
227 all_signals.len().saturating_sub(in_budget_signals.len()) as u32;
228
229 let payload = ReviewPayload {
230 state_id: req.state_id.clone(),
231 summary: Some(summary),
232 agent_narrative,
233 partition: Some(partition_to_proto(partition)),
234 in_budget_signals,
235 all_signals,
236 tick_budget: 3,
237 discussions,
238 merge_requirements: Vec::<MergeRequirement>::new(),
241 signing_footer: Some(SigningFooter {
242 available_kinds: vec![
243 grpc::heddle::v1::ReviewKind::Read as i32,
244 grpc::heddle::v1::ReviewKind::AgentPreview as i32,
245 grpc::heddle::v1::ReviewKind::AgentCoReview as i32,
246 ],
247 }),
248 };
249
250 Ok(Response::new(payload))
251 }
252
253 async fn sign_state(
254 &self,
255 request: Request<SignStateRequest>,
256 ) -> Result<Response<SignStateResponse>, Status> {
257 let req = request.into_inner();
258 let req_bytes = req.encode_to_vec();
259 let client_operation_id = req.client_operation_id.clone();
260 let inner = self.inner.clone();
261
262 let response = with_idempotency(
263 &self.inner,
264 &client_operation_id,
265 "state_review.sign_state",
266 &req_bytes,
267 move || {
268 let inner = inner.clone();
269 async move { execute_sign_state(&inner, req).await }
270 },
271 )
272 .await?;
273
274 Ok(Response::new(response))
275 }
276
277 async fn list_signatures(
278 &self,
279 request: Request<ListSignaturesRequest>,
280 ) -> Result<Response<ListSignaturesResponse>, Status> {
281 let req = request.into_inner();
282 let change_id = parse_change_id(&req.state_id)?;
283 let repo = self.inner.repo();
284 let state = repo
285 .store()
286 .get_state(&change_id)
287 .map_err(to_status)?
288 .ok_or_else(|| {
289 Status::not_found(format!("state {} not found", change_id.to_string_full()))
290 })?;
291
292 let signatures = match state.review_signatures {
293 Some(hash) => {
294 let blob = repo
295 .store()
296 .get_blob(&hash)
297 .map_err(to_status)?
298 .ok_or_else(|| {
299 Status::internal(format!(
300 "review signatures blob {} missing from object store",
301 hash
302 ))
303 })?;
304 let decoded = ReviewSignaturesBlob::decode(blob.content())
305 .map_err(|err| Status::internal(format!("decode review signatures: {err}")))?;
306 decoded
307 .signatures
308 .into_iter()
309 .enumerate()
310 .map(|(idx, sig)| review_signature_to_proto(sig, synthetic_signature_id(idx)))
311 .collect()
312 }
313 None => Vec::new(),
314 };
315
316 Ok(Response::new(ListSignaturesResponse { signatures }))
317 }
318}
319
320async fn execute_sign_state(
323 inner: &GrpcLocalService,
324 req: SignStateRequest,
325) -> Result<SignStateResponse, Status> {
326 let kind = match grpc::heddle::v1::ReviewKind::try_from(req.kind)
328 .map_err(|_| Status::invalid_argument(format!("unknown review kind tag {}", req.kind)))?
329 {
330 grpc::heddle::v1::ReviewKind::Read => ReviewKind::Read,
331 grpc::heddle::v1::ReviewKind::AgentPreview => ReviewKind::AgentPreview,
332 grpc::heddle::v1::ReviewKind::AgentCoReview => ReviewKind::AgentCoReview,
333 grpc::heddle::v1::ReviewKind::Unspecified => {
334 return Err(Status::invalid_argument("review kind is required"));
335 }
336 };
337
338 let change_id = parse_change_id(&req.state_id)?;
340 let repo = inner.repo();
341
342 let scope = match req.scope.as_ref() {
344 Some(s) => proto_scope_to_object(s)?,
345 None => ReviewScope::WholeChange,
346 };
347
348 let actor = repo
356 .get_principal()
357 .map_err(|err| Status::internal(format!("resolve caller principal: {err}")))?;
358 let justification = if req.justification.is_empty() {
359 None
360 } else {
361 Some(req.justification.clone())
362 };
363
364 let now = chrono::Utc::now().timestamp();
365 let signed_at = req.signed_at.as_ref().map(|t| t.seconds).unwrap_or(0);
366 if signed_at == 0 {
367 return Err(Status::invalid_argument(
368 "signed_at is required and must match the timestamp the client signed over",
369 ));
370 }
371 if (signed_at - now).abs() > SIGN_TIMESTAMP_SKEW_SECS {
372 return Err(Status::invalid_argument(format!(
373 "signed_at={signed_at} is too far from server time={now} (max skew {SIGN_TIMESTAMP_SKEW_SECS}s)"
374 )));
375 }
376
377 let new_sig = ReviewSignature {
378 actor,
379 kind,
380 scope: scope.clone(),
381 justification: justification.clone(),
382 signed_at,
383 algorithm: req.algorithm.clone(),
384 public_key: hex::encode(&req.public_key),
385 signature: hex::encode(&req.signature),
386 };
387 new_sig
388 .validate()
389 .map_err(|err| Status::invalid_argument(format!("invalid review signature: {err}")))?;
390
391 let public_key_bytes = req.public_key.clone();
392 let signature_bytes = req.signature.clone();
393 let payload = signing_payload(change_id, kind, &scope, signed_at, justification.as_deref());
394 verify_payload_signature(
395 &payload,
396 &req.algorithm,
397 &public_key_bytes,
398 &signature_bytes,
399 )
400 .map_err(|err| {
401 Status::invalid_argument(format!(
402 "review signature failed verification ({}): {err}",
403 req.algorithm
404 ))
405 })?;
406
407 let _lock = repo
413 .locker()
414 .write()
415 .map_err(|err| Status::internal(err.to_string()))?;
416 let state = repo
417 .store()
418 .get_state(&change_id)
419 .map_err(to_status)?
420 .ok_or_else(|| {
421 Status::not_found(format!("state {} not found", change_id.to_string_full()))
422 })?;
423 let mut blob = match state.review_signatures {
424 Some(hash) => {
425 let raw = repo
426 .store()
427 .get_blob(&hash)
428 .map_err(to_status)?
429 .ok_or_else(|| {
430 Status::internal(format!(
431 "existing review signatures blob {} missing from object store",
432 hash
433 ))
434 })?;
435 ReviewSignaturesBlob::decode(raw.content())
436 .map_err(|err| Status::internal(format!("decode review signatures: {err}")))?
437 }
438 None => ReviewSignaturesBlob::new(Vec::new()),
439 };
440 blob.signatures.push(new_sig);
441 let new_index = blob.signatures.len() - 1;
442
443 let bytes = blob
445 .encode()
446 .map_err(|err| Status::internal(format!("encode review signatures: {err}")))?;
447 let content_hash = repo
448 .store()
449 .put_blob(&Blob::new(bytes))
450 .map_err(to_status)?;
451
452 let new_state = state.with_review_signatures(content_hash);
454 repo.store().put_state(&new_state).map_err(to_status)?;
455
456 Ok(SignStateResponse {
457 signature_id: synthetic_signature_id(new_index),
458 state_id: req.state_id,
459 })
460}
461
462fn synthetic_signature_id(index: usize) -> String {
466 format!("rs-{index}")
467}
468
469fn parse_change_id(s: &[u8]) -> Result<ChangeId, Status> {
470 ChangeId::try_from_slice(s)
471 .map_err(|err| Status::invalid_argument(format!("invalid state_id: {err}")))
472}
473
474fn proto_scope_to_object(scope: &ProtoReviewScope) -> Result<ReviewScope, Status> {
475 use grpc::heddle::v1::review_scope::Scope;
476 match scope.scope.as_ref() {
477 None | Some(Scope::WholeChange(_)) => Ok(ReviewScope::WholeChange),
478 Some(Scope::Symbols(list)) => {
479 if list.symbols.is_empty() {
480 return Err(Status::invalid_argument(
481 "symbols scope requires at least one symbol anchor",
482 ));
483 }
484 let symbols = list
485 .symbols
486 .iter()
487 .map(|s| SymbolAnchor::new(s.file.clone(), s.symbol.clone()))
488 .collect();
489 Ok(ReviewScope::Symbols(symbols))
490 }
491 }
492}
493
494fn object_scope_to_proto(scope: &ReviewScope) -> ProtoReviewScope {
495 use grpc::heddle::v1::review_scope::{Scope, SymbolList, WholeChange};
496 let inner = match scope {
497 ReviewScope::WholeChange => Scope::WholeChange(WholeChange {}),
498 ReviewScope::Symbols(symbols) => Scope::Symbols(SymbolList {
499 symbols: symbols
500 .iter()
501 .map(|s| ProtoPathSymbolRef {
502 file: s.file.clone(),
503 symbol: s.symbol.clone(),
504 })
505 .collect(),
506 }),
507 };
508 ProtoReviewScope { scope: Some(inner) }
509}
510
511fn review_signature_to_proto(sig: ReviewSignature, signature_id: String) -> ProtoReviewSignature {
512 ProtoReviewSignature {
513 signature_id,
514 actor_name: sig.actor.name.clone(),
515 actor_email: sig.actor.email.clone(),
516 kind: review_kind_to_proto(sig.kind) as i32,
517 scope: Some(object_scope_to_proto(&sig.scope)),
518 justification: sig.justification.unwrap_or_default(),
519 signed_at: Some(prost_types::Timestamp {
520 seconds: sig.signed_at,
521 nanos: 0,
522 }),
523 algorithm: sig.algorithm,
524 public_key: hex::decode(&sig.public_key).unwrap_or_default(),
525 signature: hex::decode(&sig.signature).unwrap_or_default(),
526 }
527}
528
529fn review_kind_to_proto(kind: ReviewKind) -> grpc::heddle::v1::ReviewKind {
530 match kind {
531 ReviewKind::Read => grpc::heddle::v1::ReviewKind::Read,
532 ReviewKind::AgentPreview => grpc::heddle::v1::ReviewKind::AgentPreview,
533 ReviewKind::AgentCoReview => grpc::heddle::v1::ReviewKind::AgentCoReview,
534 }
535}
536
537fn risk_signal_to_proto(sig: objects::object::RiskSignal, visibility: &str) -> ProtoRiskSignal {
538 let (start_line, end_line) = sig.anchor.line_range.unwrap_or((0, 0));
539 ProtoRiskSignal {
540 kind: sig.kind.as_str().to_string(),
541 anchor: Some(ProtoSignalAnchor {
542 file: sig.anchor.file,
543 symbol: sig.anchor.symbol.unwrap_or_default(),
544 start_line,
545 end_line,
546 }),
547 reason: sig.reason,
548 producer_module: sig.producer.module,
549 producer_version: sig.producer.version,
550 computed_at: Some(prost_types::Timestamp {
551 seconds: sig.computed_at,
552 nanos: 0,
553 }),
554 visibility: visibility.to_string(),
555 }
556}
557
558fn changed_files_as_symbols(
569 repo: &Repository,
570 state: &State,
571 changed_paths: &[ChangedPath],
572) -> objects::error::Result<Vec<PathSymbol>> {
573 let new_tree = match repo.store().get_tree(&state.tree)? {
574 Some(t) => t,
575 None => return Ok(Vec::new()),
576 };
577 let new_files = collect_files(repo, &new_tree, "")?;
578
579 let mut out: Vec<PathSymbol> = Vec::new();
580 for path_kind in changed_paths {
581 let path = &path_kind.path;
582 #[cfg_attr(not(feature = "semantic"), allow(unused_mut))]
583 let mut emitted_any = false;
584 if let Some(hash) = new_files.get(path) {
585 #[cfg(feature = "semantic")]
586 {
587 if let Some(blob) = repo.store().get_blob(hash)? {
588 emitted_any = extract_file_symbols(path, blob.content(), &mut out);
589 }
590 }
591 #[cfg(not(feature = "semantic"))]
592 {
593 let _ = hash;
594 }
595 }
596 if !emitted_any {
597 out.push(PathSymbol {
598 file: path.clone(),
599 symbol: path.clone(),
600 kind: SymbolKind::Other,
601 });
602 }
603 }
604 Ok(out)
605}
606
607#[cfg(feature = "semantic")]
608fn extract_file_symbols(path: &str, source: &[u8], out: &mut Vec<PathSymbol>) -> bool {
609 use ::semantic::symbol_resolver::{Definition, DefinitionKind, extract_definitions};
610 let definitions: Vec<Definition> = match extract_definitions(source, std::path::Path::new(path))
611 {
612 Ok(defs) => defs,
613 Err(_) => return false,
614 };
615 if definitions.is_empty() {
616 return false;
617 }
618 for d in definitions {
619 let kind = match d.kind {
620 DefinitionKind::Type => SymbolKind::Type,
621 DefinitionKind::Trait => SymbolKind::Trait,
622 DefinitionKind::Class => SymbolKind::Class,
623 DefinitionKind::Interface => SymbolKind::Interface,
624 DefinitionKind::TypeAlias => SymbolKind::TypeAlias,
625 DefinitionKind::EnumDef => SymbolKind::EnumDef,
626 DefinitionKind::ConstDecl => SymbolKind::ConstDecl,
627 DefinitionKind::Module => SymbolKind::Module,
628 DefinitionKind::Function => SymbolKind::Function,
629 DefinitionKind::Other => SymbolKind::Other,
630 };
631 let symbol = match d.parent_name.as_deref() {
632 Some(parent) if !parent.is_empty() => format!("{parent}::{}", d.name),
633 _ => d.name,
634 };
635 out.push(PathSymbol {
636 file: path.to_string(),
637 symbol,
638 kind,
639 });
640 }
641 true
642}
643
644fn collect_files(
645 repo: &Repository,
646 tree: &objects::object::Tree,
647 prefix: &str,
648) -> objects::error::Result<std::collections::HashMap<String, objects::object::ContentHash>> {
649 let mut out = std::collections::HashMap::new();
650 for entry in tree.entries() {
651 let path = if prefix.is_empty() {
652 entry.name().to_string()
653 } else {
654 format!("{prefix}/{}", entry.name())
655 };
656 if entry.is_tree() {
657 if let Some(hash) = entry.tree_hash()
658 && let Some(subtree) = repo.store().get_tree(&hash)?
659 {
660 let sub = collect_files(repo, &subtree, &path)?;
661 out.extend(sub);
662 }
663 } else if let Some(hash) = entry.content_hash() {
664 out.insert(path, hash);
665 }
666 }
667 Ok(out)
668}
669
670fn partition_to_proto(p: ReadingOrderPartition) -> ProtoReadingOrderPartition {
671 ProtoReadingOrderPartition {
672 structural: p.structural.iter().map(path_symbol_to_proto).collect(),
673 consequence: p.consequence.iter().map(path_symbol_to_proto).collect(),
674 tests_and_docs: p.tests_and_docs.iter().map(path_symbol_to_proto).collect(),
675 }
676}
677
678fn path_symbol_to_proto(p: &PathSymbol) -> ProtoPathSymbolRef {
679 ProtoPathSymbolRef {
680 file: p.file.clone(),
681 symbol: p.symbol.clone(),
682 }
683}
684
685fn discussion_to_anchored_proto(d: &Discussion) -> AnchoredDiscussion {
686 AnchoredDiscussion {
687 id: d.id.clone(),
688 anchor: Some(ProtoPathSymbolRef {
689 file: d.anchor.file.clone(),
690 symbol: d.anchor.symbol.clone(),
691 }),
692 opened_against_state: d.opened_against_state.as_bytes().to_vec(),
693 opened_at: Some(prost_types::Timestamp {
694 seconds: d.opened_at,
695 nanos: 0,
696 }),
697 turns: d
698 .turns
699 .iter()
700 .map(|t| grpc::heddle::v1::DiscussionTurn {
701 author_name: t.author.name.clone(),
702 author_email: t.author.email.clone(),
703 body: t.body.clone(),
704 posted_at: Some(prost_types::Timestamp {
705 seconds: t.posted_at,
706 nanos: 0,
707 }),
708 })
709 .collect(),
710 resolution: Some(discussion_resolution_to_proto(&d.resolution)),
711 body_changed_since_open: d.body_changed_since_open,
712 orphaned: d.orphaned,
713 visibility: d.visibility.as_str().to_string(),
714 }
715}
716
717fn discussion_resolution_to_proto(
718 resolution: &DiscussionResolution,
719) -> grpc::heddle::v1::DiscussionResolution {
720 use grpc::heddle::v1::discussion_resolution::{
721 Dismissed, Open, ResolvedByEdit, ResolvedIntoAnnotation, State,
722 };
723 let state = match resolution {
724 DiscussionResolution::Open => State::Open(Open {}),
725 DiscussionResolution::ResolvedIntoAnnotation { annotation_id } => {
726 State::IntoAnnotation(ResolvedIntoAnnotation {
727 annotation_id: annotation_id.clone(),
728 })
729 }
730 DiscussionResolution::ResolvedByEdit { state_id } => State::ByEdit(ResolvedByEdit {
731 state_id: state_id.as_bytes().to_vec(),
732 }),
733 DiscussionResolution::Dismissed { reason } => State::Dismissed(Dismissed {
734 reason: reason.clone(),
735 }),
736 };
737 grpc::heddle::v1::DiscussionResolution { state: Some(state) }
738}
739
740#[derive(Debug, Clone)]
748struct ChangedPath {
749 path: String,
750 kind: DiffKind,
751}
752
753impl ChangedPath {
754 fn kind_str(&self) -> &'static str {
755 match self.kind {
756 DiffKind::Added => "added",
757 DiffKind::Modified => "modified",
758 DiffKind::Deleted => "deleted",
759 DiffKind::Unchanged => "unchanged",
760 }
761 }
762}
763
764struct DiffSummary {
771 files_changed: u32,
772 added_files: u32,
773 modified_files: u32,
774 deleted_files: u32,
775 added_lines: u32,
776 removed_lines: u32,
777 changed_paths: Vec<ChangedPath>,
778}
779
780fn compute_state_diff_summary(
789 repo: &Repository,
790 state: &State,
791) -> objects::error::Result<DiffSummary> {
792 use objects::object::Tree;
793 let parent_tree_hash = if let Some(parent_id) = state.parents.first() {
794 match repo.store().get_state(parent_id)? {
795 Some(parent_state) => parent_state.tree,
796 None => Tree::new().hash(),
797 }
798 } else {
799 Tree::new().hash()
800 };
801
802 let parent_tree_obj = repo.store().get_tree(&parent_tree_hash)?;
808 let new_tree_obj = repo.store().get_tree(&state.tree)?;
809
810 let changes = if parent_tree_obj.is_some() && new_tree_obj.is_some() {
816 repo.diff_trees(&parent_tree_hash, &state.tree)?
817 } else {
818 objects::object::FileChangeSet::new()
819 };
820
821 let mut added_lines: u32 = 0;
828 let mut removed_lines: u32 = 0;
829 let mut changed_paths: Vec<ChangedPath> = Vec::with_capacity(changes.len());
830
831 let parent_files = match parent_tree_obj.as_ref() {
832 Some(t) => collect_files(repo, t, "")?,
833 None => std::collections::HashMap::new(),
834 };
835 let new_files = match new_tree_obj.as_ref() {
836 Some(t) => collect_files(repo, t, "")?,
837 None => std::collections::HashMap::new(),
838 };
839
840 let mut added_files: u32 = 0;
841 let mut modified_files: u32 = 0;
842 let mut deleted_files: u32 = 0;
843
844 for change in changes.iter() {
845 match change.kind {
846 DiffKind::Added => {
847 added_files += 1;
848 if let Some(hash) = new_files.get(&change.path)
855 && let Some(blob) = repo.store().get_blob(hash)?
856 {
857 added_lines = added_lines.saturating_add(line_count(blob.content()));
858 }
859 }
860 DiffKind::Deleted => {
861 deleted_files += 1;
862 if let Some(hash) = parent_files.get(&change.path)
863 && let Some(blob) = repo.store().get_blob(hash)?
864 {
865 removed_lines = removed_lines.saturating_add(line_count(blob.content()));
866 }
867 }
868 DiffKind::Modified => {
869 modified_files += 1;
870 let old_blob = match parent_files.get(&change.path) {
877 Some(h) => repo.store().get_blob(h)?,
878 None => None,
879 };
880 let new_blob = match new_files.get(&change.path) {
881 Some(h) => repo.store().get_blob(h)?,
882 None => None,
883 };
884 if let (Some(old), Some(new)) = (old_blob, new_blob) {
885 for line in diff_blobs(&old, &new) {
886 match line {
887 objects::worktree::DiffLine::Added(_) => {
888 added_lines = added_lines.saturating_add(1);
889 }
890 objects::worktree::DiffLine::Removed(_) => {
891 removed_lines = removed_lines.saturating_add(1);
892 }
893 objects::worktree::DiffLine::Context(_) => {}
894 }
895 }
896 }
897 }
898 DiffKind::Unchanged => continue,
899 }
900 changed_paths.push(ChangedPath {
901 path: change.path.clone(),
902 kind: change.kind,
903 });
904 }
905
906 Ok(DiffSummary {
907 files_changed: changed_paths.len() as u32,
908 added_files,
909 modified_files,
910 deleted_files,
911 added_lines,
912 removed_lines,
913 changed_paths,
914 })
915}
916
917fn line_count(content: &[u8]) -> u32 {
922 let Ok(s) = std::str::from_utf8(content) else {
923 return 0;
924 };
925 if s.is_empty() {
926 return 0;
927 }
928 let trimmed = s.strip_suffix('\n').unwrap_or(s);
929 if trimmed.is_empty() {
930 return 1;
931 }
932 (trimmed.matches('\n').count() as u32).saturating_add(1)
933}
934
935#[cfg(test)]
940mod tests {
941 use std::sync::Arc;
942
943 use crypto::Signer as _;
944 use grpc::heddle::v1::ReviewScope as ProtoReviewScope;
945 use repo::{Repository, operation_dedup::OperationDedupStore};
946 use tempfile::TempDir;
947
948 use super::*;
949
950 fn fresh_service() -> (LocalStateReviewService, Arc<Repository>, TempDir) {
951 let temp = TempDir::new().expect("create tempdir");
952 unsafe {
956 std::env::set_var("HEDDLE_PRINCIPAL_NAME", "Alice Tester");
957 std::env::set_var("HEDDLE_PRINCIPAL_EMAIL", "alice@example.com");
958 }
959 let repo = Repository::init_default(temp.path()).expect("init repo");
960 let dedup = OperationDedupStore::open(repo.heddle_dir()).expect("open dedup");
961 let repo = Arc::new(repo);
962 let svc =
963 LocalStateReviewService::new(GrpcLocalService::new(repo.clone(), Arc::new(dedup)));
964 (svc, repo, temp)
965 }
966
967 fn capture_state(repo: &Repository) -> ChangeId {
968 std::fs::write(repo.root().join("hello.txt"), b"hi").expect("write file");
970 let state = repo
971 .snapshot(Some("seed".to_string()), None)
972 .expect("snapshot");
973 state.change_id
974 }
975
976 fn sign_request(state_id: &ChangeId, op_id: &str) -> SignStateRequest {
977 let signer = crypto::Ed25519Signer::generate().expect("generate ed25519 key");
978 let scope = ReviewScope::WholeChange;
979 let signed_at = chrono::Utc::now().timestamp();
980 let payload = signing_payload(*state_id, ReviewKind::Read, &scope, signed_at, None);
981 let signature = signer.sign(&payload).expect("sign payload");
982 use grpc::heddle::v1::review_scope::{Scope, WholeChange};
983 SignStateRequest {
984 repo_path: String::new(),
985 state_id: state_id.as_bytes().to_vec(),
986 kind: grpc::heddle::v1::ReviewKind::Read as i32,
987 scope: Some(ProtoReviewScope {
988 scope: Some(Scope::WholeChange(WholeChange {})),
989 }),
990 justification: String::new(),
991 algorithm: "ed25519".to_string(),
992 public_key: signer.public_key().to_vec(),
993 signature: signature.clone(),
994 signed_at: Some(prost_types::Timestamp {
995 seconds: signed_at,
996 nanos: 0,
997 }),
998 client_operation_id: op_id.to_string(),
999 }
1000 }
1001
1002 #[tokio::test]
1003 #[serial_test::serial(process_global)]
1004 async fn sign_state_persists_to_review_signatures_blob() {
1005 let (svc, repo, _tmp) = fresh_service();
1006 let state_id = capture_state(&repo);
1007
1008 let resp = svc
1009 .sign_state(Request::new(sign_request(&state_id, "")))
1010 .await
1011 .expect("sign_state");
1012 assert!(!resp.get_ref().signature_id.is_empty());
1013 assert_eq!(resp.get_ref().state_id, state_id.as_bytes().to_vec());
1014
1015 let listing = svc
1016 .list_signatures(Request::new(ListSignaturesRequest {
1017 repo_path: String::new(),
1018 state_id: state_id.as_bytes().to_vec(),
1019 }))
1020 .await
1021 .expect("list_signatures");
1022 let sigs = &listing.get_ref().signatures;
1023 assert_eq!(sigs.len(), 1, "expected one signature, got {sigs:?}");
1024 assert_eq!(sigs[0].kind, grpc::heddle::v1::ReviewKind::Read as i32);
1025 assert_eq!(sigs[0].algorithm, "ed25519");
1026 assert_eq!(sigs[0].actor_name, "Alice Tester");
1027 assert_eq!(sigs[0].actor_email, "alice@example.com");
1028 let scope_case = sigs[0].scope.as_ref().and_then(|s| s.scope.as_ref());
1029 assert!(matches!(
1030 scope_case,
1031 Some(grpc::heddle::v1::review_scope::Scope::WholeChange(_))
1032 ));
1033 }
1034
1035 #[tokio::test]
1036 #[serial_test::serial(process_global)]
1037 async fn sign_state_idempotent() {
1038 let (svc, repo, _tmp) = fresh_service();
1039 let state_id = capture_state(&repo);
1040 let op_id = objects::object::OperationId::new().to_string();
1041 let req = sign_request(&state_id, &op_id);
1044
1045 let first = svc
1046 .sign_state(Request::new(req.clone()))
1047 .await
1048 .expect("first sign_state");
1049 let second = svc
1050 .sign_state(Request::new(req))
1051 .await
1052 .expect("second sign_state");
1053 assert_eq!(
1054 first.get_ref().signature_id,
1055 second.get_ref().signature_id,
1056 "replayed call must return the same signature_id"
1057 );
1058
1059 let listing = svc
1060 .list_signatures(Request::new(ListSignaturesRequest {
1061 repo_path: String::new(),
1062 state_id: state_id.as_bytes().to_vec(),
1063 }))
1064 .await
1065 .expect("list_signatures");
1066 assert_eq!(
1067 listing.get_ref().signatures.len(),
1068 1,
1069 "idempotent replay must not append a duplicate signature"
1070 );
1071 }
1072
1073 #[tokio::test]
1074 #[serial_test::serial(process_global)]
1075 async fn sign_state_rejects_forged_signature() {
1076 let (svc, repo, _tmp) = fresh_service();
1077 let state_id = capture_state(&repo);
1078 let mut req = sign_request(&state_id, "");
1079 let last = req.signature.len() - 1;
1081 req.signature[last] ^= 0xff;
1082
1083 let err = svc
1084 .sign_state(Request::new(req))
1085 .await
1086 .expect_err("forged signature must be rejected");
1087 assert_eq!(err.code(), tonic::Code::InvalidArgument, "{err:?}");
1088 assert!(
1089 err.message().contains("failed verification"),
1090 "unexpected error message: {}",
1091 err.message()
1092 );
1093 }
1094
1095 #[tokio::test]
1096 #[serial_test::serial(process_global)]
1097 async fn sign_state_rejects_skewed_timestamp() {
1098 let (svc, repo, _tmp) = fresh_service();
1099 let state_id = capture_state(&repo);
1100 let mut req = sign_request(&state_id, "");
1101 if let Some(ts) = req.signed_at.as_mut() {
1103 ts.seconds += 60 * 60;
1104 }
1105
1106 let err = svc
1107 .sign_state(Request::new(req))
1108 .await
1109 .expect_err("skewed timestamp must be rejected");
1110 assert_eq!(err.code(), tonic::Code::InvalidArgument);
1111 assert!(err.message().contains("too far from server time"));
1112 }
1113
1114 #[tokio::test]
1115 #[serial_test::serial(process_global)]
1116 async fn sign_state_attributes_to_caller_not_state_author() {
1117 let (svc, repo, _tmp) = fresh_service();
1123 let state_id = capture_state(&repo);
1127
1128 unsafe {
1131 std::env::set_var("HEDDLE_PRINCIPAL_NAME", "Bob Signer");
1132 std::env::set_var("HEDDLE_PRINCIPAL_EMAIL", "bob@example.com");
1133 }
1134
1135 svc.sign_state(Request::new(sign_request(&state_id, "")))
1136 .await
1137 .expect("sign_state");
1138
1139 let listing = svc
1140 .list_signatures(Request::new(ListSignaturesRequest {
1141 repo_path: String::new(),
1142 state_id: state_id.as_bytes().to_vec(),
1143 }))
1144 .await
1145 .expect("list_signatures");
1146 let sigs = &listing.get_ref().signatures;
1147 assert_eq!(sigs.len(), 1);
1148 assert_eq!(
1149 sigs[0].actor_name, "Bob Signer",
1150 "signature must attribute to the caller (Bob), not the state author (Alice)"
1151 );
1152 assert_eq!(sigs[0].actor_email, "bob@example.com");
1153
1154 unsafe {
1157 std::env::set_var("HEDDLE_PRINCIPAL_NAME", "Alice Tester");
1158 std::env::set_var("HEDDLE_PRINCIPAL_EMAIL", "alice@example.com");
1159 }
1160 }
1161
1162 #[tokio::test]
1163 #[serial_test::serial(process_global)]
1164 async fn sign_state_serializes_concurrent_appends() {
1165 let (svc, repo, _tmp) = fresh_service();
1172 let state_id = capture_state(&repo);
1173
1174 let op_a = objects::object::OperationId::new().to_string();
1176 let op_b = objects::object::OperationId::new().to_string();
1177 let req_a = sign_request(&state_id, &op_a);
1178 let req_b = sign_request(&state_id, &op_b);
1179
1180 let svc_a = svc.clone();
1181 let svc_b = svc.clone();
1182 let (a, b) = tokio::join!(
1183 svc_a.sign_state(Request::new(req_a)),
1184 svc_b.sign_state(Request::new(req_b)),
1185 );
1186 a.expect("first sign_state");
1187 b.expect("second sign_state");
1188
1189 let listing = svc
1190 .list_signatures(Request::new(ListSignaturesRequest {
1191 repo_path: String::new(),
1192 state_id: state_id.as_bytes().to_vec(),
1193 }))
1194 .await
1195 .expect("list_signatures");
1196 assert_eq!(
1197 listing.get_ref().signatures.len(),
1198 2,
1199 "both concurrent signatures must land — neither should be lost \
1200 to a stale-blob clobber"
1201 );
1202 }
1203
1204 #[tokio::test]
1212 #[serial_test::serial(process_global)]
1213 async fn get_review_payload_populates_diff_summary_and_signals() {
1214 let (svc, repo, _tmp) = fresh_service();
1215
1216 std::fs::write(repo.root().join("hello.txt"), b"first\nsecond\nthird\n")
1219 .expect("write hello.txt");
1220 let first = repo
1221 .snapshot(Some("first capture".to_string()), None)
1222 .expect("first snapshot");
1223
1224 let resp_first = svc
1225 .get_review_payload(Request::new(GetReviewPayloadRequest {
1226 repo_path: String::new(),
1227 state_id: first.change_id.as_bytes().to_vec(),
1228 include_all_signals: false,
1229 }))
1230 .await
1231 .expect("get_review_payload first");
1232 let payload_first = resp_first.into_inner();
1233 let summary_first = payload_first.summary.as_ref().expect("summary present");
1234 assert!(
1235 summary_first.files_changed >= 1,
1236 "first state should report at least one file changed (vs empty parent), got {}",
1237 summary_first.files_changed
1238 );
1239 assert!(
1240 summary_first.added_lines >= 3,
1241 "first state should report 3+ added lines, got {}",
1242 summary_first.added_lines
1243 );
1244 assert!(
1245 !payload_first.in_budget_signals.is_empty(),
1246 "in_budget_signals must include a diff_summary entry"
1247 );
1248 let first_signal = &payload_first.in_budget_signals[0];
1249 assert!(
1250 first_signal.kind.starts_with("diff_summary."),
1251 "expected synthetic diff_summary signal kind, got {}",
1252 first_signal.kind
1253 );
1254 assert_eq!(first_signal.producer_module, "review_show.diff_summary");
1255 assert_eq!(first_signal.visibility, "visible");
1256
1257 std::fs::write(
1261 repo.root().join("hello.txt"),
1262 b"first\nsecond\nthird\nfourth\n",
1263 )
1264 .expect("modify hello.txt");
1265 let second = repo
1266 .snapshot(Some("second capture".to_string()), None)
1267 .expect("second snapshot");
1268
1269 let resp_second = svc
1270 .get_review_payload(Request::new(GetReviewPayloadRequest {
1271 repo_path: String::new(),
1272 state_id: second.change_id.as_bytes().to_vec(),
1273 include_all_signals: false,
1274 }))
1275 .await
1276 .expect("get_review_payload second");
1277 let payload_second = resp_second.into_inner();
1278 let summary_second = payload_second.summary.as_ref().expect("summary present");
1279 assert_eq!(
1280 summary_second.files_changed, 1,
1281 "second state should report exactly one modified file"
1282 );
1283 assert!(
1284 summary_second.added_lines >= 1,
1285 "second state should report at least one added line, got {}",
1286 summary_second.added_lines
1287 );
1288 assert!(
1289 !payload_second.in_budget_signals.is_empty(),
1290 "in_budget_signals must include a diff_summary entry"
1291 );
1292 let signal = &payload_second.in_budget_signals[0];
1293 assert_eq!(
1294 signal
1295 .anchor
1296 .as_ref()
1297 .map(|a| a.file.as_str())
1298 .unwrap_or(""),
1299 "hello.txt",
1300 "diff_summary signal should anchor on the modified file"
1301 );
1302 assert!(
1303 signal.reason.contains("files changed"),
1304 "first signal reason should carry the aggregate summary, got {}",
1305 signal.reason
1306 );
1307 assert_eq!(
1311 summary_second.in_budget_signal_count,
1312 payload_second.in_budget_signals.len() as u32,
1313 "in_budget_signal_count must match the array length"
1314 );
1315 }
1316
1317 #[tokio::test]
1318 #[serial_test::serial(process_global)]
1319 async fn get_review_payload_surfaces_gitlink_target_changes() {
1320 let (svc, repo, _tmp) = fresh_service();
1321
1322 let old_target = "0303030303030303030303030303030303030303"
1323 .parse()
1324 .expect("old git oid");
1325 let new_target = "0404040404040404040404040404040404040404"
1326 .parse()
1327 .expect("new git oid");
1328 let old_tree = objects::object::Tree::from_entries(vec![
1329 objects::object::TreeEntry::gitlink("vendor", old_target).expect("old gitlink"),
1330 ]);
1331 let new_tree = objects::object::Tree::from_entries(vec![
1332 objects::object::TreeEntry::gitlink("vendor", new_target).expect("new gitlink"),
1333 ]);
1334 let old_tree_hash = repo.store().put_tree(&old_tree).expect("put old tree");
1335 let new_tree_hash = repo.store().put_tree(&new_tree).expect("put new tree");
1336 let base = State::new_snapshot(
1337 old_tree_hash,
1338 Vec::new(),
1339 objects::object::Attribution::human(objects::object::Principal::new(
1340 "Gitlink Reviewer",
1341 "gitlink@example.test",
1342 )),
1343 );
1344 let base_id = base.change_id;
1345 repo.store().put_state(&base).expect("put base state");
1346 let changed = State::new_snapshot(
1347 new_tree_hash,
1348 vec![base_id],
1349 objects::object::Attribution::human(objects::object::Principal::new(
1350 "Gitlink Reviewer",
1351 "gitlink@example.test",
1352 )),
1353 );
1354 let changed_id = changed.change_id;
1355 repo.store().put_state(&changed).expect("put changed state");
1356
1357 let resp = svc
1358 .get_review_payload(Request::new(GetReviewPayloadRequest {
1359 repo_path: String::new(),
1360 state_id: changed_id.as_bytes().to_vec(),
1361 include_all_signals: false,
1362 }))
1363 .await
1364 .expect("get_review_payload gitlink change");
1365 let payload = resp.into_inner();
1366 let summary = payload.summary.as_ref().expect("summary present");
1367 assert_eq!(
1368 summary.files_changed, 1,
1369 "gitlink pointer change should count as one changed path"
1370 );
1371 assert_eq!(summary.added_lines, 0);
1372 assert_eq!(summary.removed_lines, 0);
1373 assert_eq!(
1374 payload
1375 .in_budget_signals
1376 .first()
1377 .and_then(|signal| signal.anchor.as_ref())
1378 .map(|anchor| anchor.file.as_str()),
1379 Some("vendor"),
1380 "diff_summary signal should be anchored on the gitlink path"
1381 );
1382 let partition = payload.partition.expect("partition present");
1383 let surfaced = partition
1384 .structural
1385 .iter()
1386 .chain(partition.consequence.iter())
1387 .chain(partition.tests_and_docs.iter())
1388 .any(|symbol| symbol.file == "vendor" && symbol.symbol == "vendor");
1389 assert!(surfaced, "gitlink change should be path-visible in review");
1390 }
1391
1392 #[tokio::test]
1404 #[serial_test::serial(process_global)]
1405 async fn get_review_payload_tolerates_missing_tree() {
1406 let (svc, repo, _tmp) = fresh_service();
1407 let state_id = capture_state(&repo);
1408
1409 let state = repo
1414 .store()
1415 .get_state(&state_id)
1416 .expect("get state")
1417 .expect("state present");
1418 let bogus_tree = objects::object::ContentHash::compute(b"definitely-not-in-store-bytes");
1419 let mut mutated = state.clone();
1420 mutated.tree = bogus_tree;
1421 repo.store().put_state(&mutated).expect("put mutated state");
1422
1423 let resp = svc
1426 .get_review_payload(Request::new(GetReviewPayloadRequest {
1427 repo_path: String::new(),
1428 state_id: state_id.as_bytes().to_vec(),
1429 include_all_signals: false,
1430 }))
1431 .await
1432 .expect("missing tree must not block review payload");
1433 let payload = resp.into_inner();
1434 let summary = payload.summary.as_ref().expect("summary present");
1435 assert_eq!(
1436 summary.files_changed, 0,
1437 "missing tree must produce a zero-change summary, got {} files",
1438 summary.files_changed
1439 );
1440 assert!(
1444 !payload.in_budget_signals.is_empty(),
1445 "in_budget_signals should always contain at least the synthetic diff_summary entry"
1446 );
1447 let kind = &payload.in_budget_signals[0].kind;
1448 assert!(
1449 kind.starts_with("diff_summary."),
1450 "expected synthetic diff_summary signal, got {kind}"
1451 );
1452 }
1453
1454 #[test]
1458 fn line_count_matches_git_semantics() {
1459 assert_eq!(line_count(b""), 0);
1460 assert_eq!(line_count(b"\n"), 1);
1461 assert_eq!(line_count(b"hello"), 1);
1462 assert_eq!(line_count(b"hello\n"), 1);
1463 assert_eq!(line_count(b"hello\nworld"), 2);
1464 assert_eq!(line_count(b"hello\nworld\n"), 2);
1465 assert_eq!(line_count(b"a\nb\nc\n"), 3);
1466 assert_eq!(line_count(&[0xff, 0xfe, 0xfd]), 0);
1468 }
1469}