1use ::state_review::{
12 PathSymbol, ReadingOrderPartition, SymbolKind, build_review_payload_partition,
13};
14use crypto::verify_payload_signature;
15use grpc::heddle::v1::{
16 AnchoredDiscussion, GetReviewPayloadRequest, ListSignaturesRequest, ListSignaturesResponse,
17 MergeRequirement, PathSymbolRef as ProtoPathSymbolRef,
18 ReadingOrderPartition as ProtoReadingOrderPartition, ReviewPayload,
19 ReviewScope as ProtoReviewScope, ReviewSignature as ProtoReviewSignature, ReviewSummary,
20 RiskSignal as ProtoRiskSignal, SignStateRequest, SignStateResponse,
21 SignalAnchor as ProtoSignalAnchor, SigningFooter,
22 state_review_service_server::StateReviewService,
23};
24use objects::{
25 lock::RepositoryLockExt,
26 object::{
27 Blob, ChangeId, DiffKind, Discussion, DiscussionResolution, DiscussionsBlob, ReviewKind,
28 ReviewScope, ReviewSignature, ReviewSignaturesBlob, RiskSignalBlob, State, SymbolAnchor,
29 signing_payload,
30 },
31 store::ObjectStore,
32 worktree::diff_blobs,
33};
34use prost::Message;
35use repo::Repository;
36use tonic::{Request, Response, Status};
37
38use super::{GrpcLocalService, to_status, with_idempotency};
39
40const SIGN_TIMESTAMP_SKEW_SECS: i64 = 5 * 60;
44
45#[derive(Clone)]
47pub struct LocalStateReviewService {
48 inner: GrpcLocalService,
49}
50
51impl LocalStateReviewService {
52 pub fn new(inner: GrpcLocalService) -> Self {
53 Self { inner }
54 }
55}
56
57#[tonic::async_trait]
58impl StateReviewService for LocalStateReviewService {
59 async fn get_review_payload(
60 &self,
61 request: Request<GetReviewPayloadRequest>,
62 ) -> Result<Response<ReviewPayload>, Status> {
63 let req = request.into_inner();
64 let change_id = parse_change_id(&req.state_id)?;
65 let repo = self.inner.repo();
66 let state = repo
67 .store()
68 .get_state(&change_id)
69 .map_err(to_status)?
70 .ok_or_else(|| {
71 Status::not_found(format!("state {} not found", change_id.to_string_full()))
72 })?;
73
74 let diff_summary = compute_state_diff_summary(repo, &state).map_err(to_status)?;
80
81 let summary = ReviewSummary {
82 headline: state.intent.clone().unwrap_or_default(),
83 files_changed: diff_summary.files_changed,
84 added_lines: diff_summary.added_lines,
85 removed_lines: diff_summary.removed_lines,
86 in_budget_signal_count: 0,
87 hidden_signal_count: 0,
88 };
89
90 let agent_narrative = if state.attribution.agent.is_some() {
91 state.intent.clone().unwrap_or_default()
92 } else {
93 String::new()
94 };
95
96 let mut all_signals: Vec<ProtoRiskSignal> = Vec::new();
100 if req.include_all_signals
101 && let Some(hash) = state.risk_signals
102 && let Some(blob) = repo.store().get_blob(&hash).map_err(to_status)?
103 {
104 let decoded = RiskSignalBlob::decode(blob.content())
105 .map_err(|err| Status::internal(format!("decode risk signals: {err}")))?;
106 all_signals = decoded
107 .signals
108 .into_iter()
109 .map(|s| risk_signal_to_proto(s, "visible"))
110 .collect();
111 }
112
113 let mut in_budget_signals: Vec<ProtoRiskSignal> = Vec::new();
122 let summary_kind = match (
123 diff_summary.added_files,
124 diff_summary.modified_files,
125 diff_summary.deleted_files,
126 ) {
127 (a, 0, 0) if a > 0 => "diff_summary.added_only",
128 (0, m, 0) if m > 0 => "diff_summary.modified_only",
129 (0, 0, d) if d > 0 => "diff_summary.deleted_only",
130 (0, 0, 0) => "diff_summary.empty",
131 _ => "diff_summary.mixed",
132 };
133 let summary_reason = format!(
134 "{} files changed (+{}/-{}, {} added, {} modified, {} deleted)",
135 diff_summary.files_changed,
136 diff_summary.added_lines,
137 diff_summary.removed_lines,
138 diff_summary.added_files,
139 diff_summary.modified_files,
140 diff_summary.deleted_files,
141 );
142 const MAX_DIFF_SIGNAL_ANCHORS: usize = 32;
147 if diff_summary.changed_paths.is_empty() {
148 in_budget_signals.push(ProtoRiskSignal {
149 kind: summary_kind.to_string(),
150 anchor: Some(ProtoSignalAnchor {
151 file: String::new(),
152 symbol: String::new(),
153 start_line: 0,
154 end_line: 0,
155 }),
156 reason: summary_reason.clone(),
157 producer_module: "review_show.diff_summary".to_string(),
158 producer_version: 1,
159 computed_at: None,
160 visibility: "visible".to_string(),
161 });
162 } else {
163 for (idx, path_kind) in diff_summary
164 .changed_paths
165 .iter()
166 .take(MAX_DIFF_SIGNAL_ANCHORS)
167 .enumerate()
168 {
169 let reason = if idx == 0 {
170 summary_reason.clone()
171 } else {
172 format!("{} ({})", path_kind.path, path_kind.kind_str())
173 };
174 in_budget_signals.push(ProtoRiskSignal {
175 kind: summary_kind.to_string(),
176 anchor: Some(ProtoSignalAnchor {
177 file: path_kind.path.clone(),
178 symbol: String::new(),
179 start_line: 0,
180 end_line: 0,
181 }),
182 reason,
183 producer_module: "review_show.diff_summary".to_string(),
184 producer_version: 1,
185 computed_at: None,
186 visibility: "visible".to_string(),
187 });
188 }
189 }
190
191 let symbols = changed_files_as_symbols(repo, &state).map_err(to_status)?;
195 let partition = build_review_payload_partition(&symbols);
196
197 let discussions = match state.discussions {
200 Some(hash) => {
201 let blob = repo
202 .store()
203 .get_blob(&hash)
204 .map_err(to_status)?
205 .ok_or_else(|| {
206 Status::internal(format!(
207 "discussions blob {} referenced by state {} is missing",
208 hash,
209 state.change_id.to_string_full()
210 ))
211 })?;
212 let decoded = DiscussionsBlob::decode(blob.content())
213 .map_err(|err| Status::internal(format!("decode discussions: {err}")))?;
214 decoded
215 .discussions
216 .iter()
217 .map(discussion_to_anchored_proto)
218 .collect()
219 }
220 None => Vec::<AnchoredDiscussion>::new(),
221 };
222
223 let mut summary = summary;
224 summary.in_budget_signal_count = in_budget_signals.len() as u32;
225 summary.hidden_signal_count =
226 all_signals.len().saturating_sub(in_budget_signals.len()) as u32;
227
228 let payload = ReviewPayload {
229 state_id: req.state_id.clone(),
230 summary: Some(summary),
231 agent_narrative,
232 partition: Some(partition_to_proto(partition)),
233 in_budget_signals,
234 all_signals,
235 tick_budget: 3,
236 discussions,
237 merge_requirements: Vec::<MergeRequirement>::new(),
240 signing_footer: Some(SigningFooter {
241 available_kinds: vec![
242 grpc::heddle::v1::ReviewKind::Read as i32,
243 grpc::heddle::v1::ReviewKind::AgentPreview as i32,
244 grpc::heddle::v1::ReviewKind::AgentCoReview as i32,
245 ],
246 }),
247 };
248
249 Ok(Response::new(payload))
250 }
251
252 async fn sign_state(
253 &self,
254 request: Request<SignStateRequest>,
255 ) -> Result<Response<SignStateResponse>, Status> {
256 let req = request.into_inner();
257 let req_bytes = req.encode_to_vec();
258 let client_operation_id = req.client_operation_id.clone();
259 let inner = self.inner.clone();
260
261 let response = with_idempotency(
262 &self.inner,
263 &client_operation_id,
264 "state_review.sign_state",
265 &req_bytes,
266 move || {
267 let inner = inner.clone();
268 async move { execute_sign_state(&inner, req).await }
269 },
270 )
271 .await?;
272
273 Ok(Response::new(response))
274 }
275
276 async fn list_signatures(
277 &self,
278 request: Request<ListSignaturesRequest>,
279 ) -> Result<Response<ListSignaturesResponse>, Status> {
280 let req = request.into_inner();
281 let change_id = parse_change_id(&req.state_id)?;
282 let repo = self.inner.repo();
283 let state = repo
284 .store()
285 .get_state(&change_id)
286 .map_err(to_status)?
287 .ok_or_else(|| {
288 Status::not_found(format!("state {} not found", change_id.to_string_full()))
289 })?;
290
291 let signatures = match state.review_signatures {
292 Some(hash) => {
293 let blob = repo
294 .store()
295 .get_blob(&hash)
296 .map_err(to_status)?
297 .ok_or_else(|| {
298 Status::internal(format!(
299 "review signatures blob {} missing from object store",
300 hash
301 ))
302 })?;
303 let decoded = ReviewSignaturesBlob::decode(blob.content())
304 .map_err(|err| Status::internal(format!("decode review signatures: {err}")))?;
305 decoded
306 .signatures
307 .into_iter()
308 .enumerate()
309 .map(|(idx, sig)| review_signature_to_proto(sig, synthetic_signature_id(idx)))
310 .collect()
311 }
312 None => Vec::new(),
313 };
314
315 Ok(Response::new(ListSignaturesResponse { signatures }))
316 }
317}
318
319async fn execute_sign_state(
322 inner: &GrpcLocalService,
323 req: SignStateRequest,
324) -> Result<SignStateResponse, Status> {
325 let kind = match grpc::heddle::v1::ReviewKind::try_from(req.kind)
327 .map_err(|_| Status::invalid_argument(format!("unknown review kind tag {}", req.kind)))?
328 {
329 grpc::heddle::v1::ReviewKind::Read => ReviewKind::Read,
330 grpc::heddle::v1::ReviewKind::AgentPreview => ReviewKind::AgentPreview,
331 grpc::heddle::v1::ReviewKind::AgentCoReview => ReviewKind::AgentCoReview,
332 grpc::heddle::v1::ReviewKind::Unspecified => {
333 return Err(Status::invalid_argument("review kind is required"));
334 }
335 };
336
337 let change_id = parse_change_id(&req.state_id)?;
339 let repo = inner.repo();
340
341 let scope = match req.scope.as_ref() {
343 Some(s) => proto_scope_to_object(s)?,
344 None => ReviewScope::WholeChange,
345 };
346
347 let actor = repo
355 .get_principal()
356 .map_err(|err| Status::internal(format!("resolve caller principal: {err}")))?;
357 let justification = if req.justification.is_empty() {
358 None
359 } else {
360 Some(req.justification.clone())
361 };
362
363 let now = chrono::Utc::now().timestamp();
364 let signed_at = req.signed_at.as_ref().map(|t| t.seconds).unwrap_or(0);
365 if signed_at == 0 {
366 return Err(Status::invalid_argument(
367 "signed_at is required and must match the timestamp the client signed over",
368 ));
369 }
370 if (signed_at - now).abs() > SIGN_TIMESTAMP_SKEW_SECS {
371 return Err(Status::invalid_argument(format!(
372 "signed_at={signed_at} is too far from server time={now} (max skew {SIGN_TIMESTAMP_SKEW_SECS}s)"
373 )));
374 }
375
376 let new_sig = ReviewSignature {
377 actor,
378 kind,
379 scope: scope.clone(),
380 justification: justification.clone(),
381 signed_at,
382 algorithm: req.algorithm.clone(),
383 public_key: hex::encode(&req.public_key),
384 signature: hex::encode(&req.signature),
385 };
386 new_sig
387 .validate()
388 .map_err(|err| Status::invalid_argument(format!("invalid review signature: {err}")))?;
389
390 let public_key_bytes = req.public_key.clone();
391 let signature_bytes = req.signature.clone();
392 let payload = signing_payload(change_id, kind, &scope, signed_at, justification.as_deref());
393 verify_payload_signature(
394 &payload,
395 &req.algorithm,
396 &public_key_bytes,
397 &signature_bytes,
398 )
399 .map_err(|err| {
400 Status::invalid_argument(format!(
401 "review signature failed verification ({}): {err}",
402 req.algorithm
403 ))
404 })?;
405
406 let _lock = repo
412 .locker()
413 .write()
414 .map_err(|err| Status::internal(err.to_string()))?;
415 let state = repo
416 .store()
417 .get_state(&change_id)
418 .map_err(to_status)?
419 .ok_or_else(|| {
420 Status::not_found(format!("state {} not found", change_id.to_string_full()))
421 })?;
422 let mut blob = match state.review_signatures {
423 Some(hash) => {
424 let raw = repo
425 .store()
426 .get_blob(&hash)
427 .map_err(to_status)?
428 .ok_or_else(|| {
429 Status::internal(format!(
430 "existing review signatures blob {} missing from object store",
431 hash
432 ))
433 })?;
434 ReviewSignaturesBlob::decode(raw.content())
435 .map_err(|err| Status::internal(format!("decode review signatures: {err}")))?
436 }
437 None => ReviewSignaturesBlob::new(Vec::new()),
438 };
439 blob.signatures.push(new_sig);
440 let new_index = blob.signatures.len() - 1;
441
442 let bytes = blob
444 .encode()
445 .map_err(|err| Status::internal(format!("encode review signatures: {err}")))?;
446 let content_hash = repo
447 .store()
448 .put_blob(&Blob::new(bytes))
449 .map_err(to_status)?;
450
451 let new_state = state.with_review_signatures(content_hash);
453 repo.store().put_state(&new_state).map_err(to_status)?;
454
455 Ok(SignStateResponse {
456 signature_id: synthetic_signature_id(new_index),
457 state_id: req.state_id,
458 })
459}
460
461fn synthetic_signature_id(index: usize) -> String {
465 format!("rs-{index}")
466}
467
468fn parse_change_id(s: &[u8]) -> Result<ChangeId, Status> {
469 ChangeId::try_from_slice(s)
470 .map_err(|err| Status::invalid_argument(format!("invalid state_id: {err}")))
471}
472
473fn proto_scope_to_object(scope: &ProtoReviewScope) -> Result<ReviewScope, Status> {
474 use grpc::heddle::v1::review_scope::Scope;
475 match scope.scope.as_ref() {
476 None | Some(Scope::WholeChange(_)) => Ok(ReviewScope::WholeChange),
477 Some(Scope::Symbols(list)) => {
478 if list.symbols.is_empty() {
479 return Err(Status::invalid_argument(
480 "symbols scope requires at least one symbol anchor",
481 ));
482 }
483 let symbols = list
484 .symbols
485 .iter()
486 .map(|s| SymbolAnchor::new(s.file.clone(), s.symbol.clone()))
487 .collect();
488 Ok(ReviewScope::Symbols(symbols))
489 }
490 }
491}
492
493fn object_scope_to_proto(scope: &ReviewScope) -> ProtoReviewScope {
494 use grpc::heddle::v1::review_scope::{Scope, SymbolList, WholeChange};
495 let inner = match scope {
496 ReviewScope::WholeChange => Scope::WholeChange(WholeChange {}),
497 ReviewScope::Symbols(symbols) => Scope::Symbols(SymbolList {
498 symbols: symbols
499 .iter()
500 .map(|s| ProtoPathSymbolRef {
501 file: s.file.clone(),
502 symbol: s.symbol.clone(),
503 })
504 .collect(),
505 }),
506 };
507 ProtoReviewScope { scope: Some(inner) }
508}
509
510fn review_signature_to_proto(sig: ReviewSignature, signature_id: String) -> ProtoReviewSignature {
511 ProtoReviewSignature {
512 signature_id,
513 actor_name: sig.actor.name.clone(),
514 actor_email: sig.actor.email.clone(),
515 kind: review_kind_to_proto(sig.kind) as i32,
516 scope: Some(object_scope_to_proto(&sig.scope)),
517 justification: sig.justification.unwrap_or_default(),
518 signed_at: Some(prost_types::Timestamp {
519 seconds: sig.signed_at,
520 nanos: 0,
521 }),
522 algorithm: sig.algorithm,
523 public_key: hex::decode(&sig.public_key).unwrap_or_default(),
524 signature: hex::decode(&sig.signature).unwrap_or_default(),
525 }
526}
527
528fn review_kind_to_proto(kind: ReviewKind) -> grpc::heddle::v1::ReviewKind {
529 match kind {
530 ReviewKind::Read => grpc::heddle::v1::ReviewKind::Read,
531 ReviewKind::AgentPreview => grpc::heddle::v1::ReviewKind::AgentPreview,
532 ReviewKind::AgentCoReview => grpc::heddle::v1::ReviewKind::AgentCoReview,
533 }
534}
535
536fn risk_signal_to_proto(sig: objects::object::RiskSignal, visibility: &str) -> ProtoRiskSignal {
537 let (start_line, end_line) = sig.anchor.line_range.unwrap_or((0, 0));
538 ProtoRiskSignal {
539 kind: sig.kind.as_str().to_string(),
540 anchor: Some(ProtoSignalAnchor {
541 file: sig.anchor.file,
542 symbol: sig.anchor.symbol.unwrap_or_default(),
543 start_line,
544 end_line,
545 }),
546 reason: sig.reason,
547 producer_module: sig.producer.module,
548 producer_version: sig.producer.version,
549 computed_at: Some(prost_types::Timestamp {
550 seconds: sig.computed_at,
551 nanos: 0,
552 }),
553 visibility: visibility.to_string(),
554 }
555}
556
557fn changed_files_as_symbols(
567 repo: &Repository,
568 state: &State,
569) -> objects::error::Result<Vec<PathSymbol>> {
570 let new_tree = match repo.store().get_tree(&state.tree)? {
571 Some(t) => t,
572 None => return Ok(Vec::new()),
573 };
574 let parent_tree = if let Some(parent_id) = state.parents.first() {
575 repo.store()
576 .get_state(parent_id)?
577 .and_then(|p| repo.store().get_tree(&p.tree).ok().flatten())
578 } else {
579 None
580 };
581 let new_files = collect_files(repo, &new_tree, "")?;
582 let parent_files = match parent_tree {
583 Some(t) => collect_files(repo, &t, "")?,
584 None => std::collections::HashMap::new(),
585 };
586
587 let mut out: Vec<PathSymbol> = Vec::new();
588 for (path, hash) in &new_files {
589 let changed = parent_files.get(path).map(|h| h != hash).unwrap_or(true);
590 if !changed {
591 continue;
592 }
593 #[cfg_attr(not(feature = "semantic"), allow(unused_mut))]
594 let mut emitted_any = false;
595 #[cfg(feature = "semantic")]
596 {
597 if let Some(blob) = repo.store().get_blob(hash)? {
598 emitted_any = extract_file_symbols(path, blob.content(), &mut out);
599 }
600 }
601 let _ = hash;
602 if !emitted_any {
603 out.push(PathSymbol {
604 file: path.clone(),
605 symbol: path.clone(),
606 kind: SymbolKind::Other,
607 });
608 }
609 }
610 Ok(out)
611}
612
613#[cfg(feature = "semantic")]
614fn extract_file_symbols(path: &str, source: &[u8], out: &mut Vec<PathSymbol>) -> bool {
615 use ::semantic::symbol_resolver::{Definition, DefinitionKind, extract_definitions};
616 let definitions: Vec<Definition> = match extract_definitions(source, std::path::Path::new(path))
617 {
618 Ok(defs) => defs,
619 Err(_) => return false,
620 };
621 if definitions.is_empty() {
622 return false;
623 }
624 for d in definitions {
625 let kind = match d.kind {
626 DefinitionKind::Type => SymbolKind::Type,
627 DefinitionKind::Trait => SymbolKind::Trait,
628 DefinitionKind::Class => SymbolKind::Class,
629 DefinitionKind::Interface => SymbolKind::Interface,
630 DefinitionKind::TypeAlias => SymbolKind::TypeAlias,
631 DefinitionKind::EnumDef => SymbolKind::EnumDef,
632 DefinitionKind::ConstDecl => SymbolKind::ConstDecl,
633 DefinitionKind::Module => SymbolKind::Module,
634 DefinitionKind::Function => SymbolKind::Function,
635 DefinitionKind::Other => SymbolKind::Other,
636 };
637 let symbol = match d.parent_name.as_deref() {
638 Some(parent) if !parent.is_empty() => format!("{parent}::{}", d.name),
639 _ => d.name,
640 };
641 out.push(PathSymbol {
642 file: path.to_string(),
643 symbol,
644 kind,
645 });
646 }
647 true
648}
649
650fn collect_files(
651 repo: &Repository,
652 tree: &objects::object::Tree,
653 prefix: &str,
654) -> objects::error::Result<std::collections::HashMap<String, objects::object::ContentHash>> {
655 let mut out = std::collections::HashMap::new();
656 for entry in tree.entries() {
657 let path = if prefix.is_empty() {
658 entry.name.clone()
659 } else {
660 format!("{prefix}/{}", entry.name)
661 };
662 if entry.is_tree() {
663 if let Some(subtree) = repo.store().get_tree(&entry.hash)? {
664 let sub = collect_files(repo, &subtree, &path)?;
665 out.extend(sub);
666 }
667 } else {
668 out.insert(path, entry.hash);
669 }
670 }
671 Ok(out)
672}
673
674fn partition_to_proto(p: ReadingOrderPartition) -> ProtoReadingOrderPartition {
675 ProtoReadingOrderPartition {
676 structural: p.structural.iter().map(path_symbol_to_proto).collect(),
677 consequence: p.consequence.iter().map(path_symbol_to_proto).collect(),
678 tests_and_docs: p.tests_and_docs.iter().map(path_symbol_to_proto).collect(),
679 }
680}
681
682fn path_symbol_to_proto(p: &PathSymbol) -> ProtoPathSymbolRef {
683 ProtoPathSymbolRef {
684 file: p.file.clone(),
685 symbol: p.symbol.clone(),
686 }
687}
688
689fn discussion_to_anchored_proto(d: &Discussion) -> AnchoredDiscussion {
690 AnchoredDiscussion {
691 id: d.id.clone(),
692 anchor: Some(ProtoPathSymbolRef {
693 file: d.anchor.file.clone(),
694 symbol: d.anchor.symbol.clone(),
695 }),
696 opened_against_state: d.opened_against_state.as_bytes().to_vec(),
697 opened_at: Some(prost_types::Timestamp {
698 seconds: d.opened_at,
699 nanos: 0,
700 }),
701 turns: d
702 .turns
703 .iter()
704 .map(|t| grpc::heddle::v1::DiscussionTurn {
705 author_name: t.author.name.clone(),
706 author_email: t.author.email.clone(),
707 body: t.body.clone(),
708 posted_at: Some(prost_types::Timestamp {
709 seconds: t.posted_at,
710 nanos: 0,
711 }),
712 })
713 .collect(),
714 resolution: Some(discussion_resolution_to_proto(&d.resolution)),
715 body_changed_since_open: d.body_changed_since_open,
716 orphaned: d.orphaned,
717 visibility: d.visibility.as_str().to_string(),
718 }
719}
720
721fn discussion_resolution_to_proto(
722 resolution: &DiscussionResolution,
723) -> grpc::heddle::v1::DiscussionResolution {
724 use grpc::heddle::v1::discussion_resolution::{
725 Dismissed, Open, ResolvedByEdit, ResolvedIntoAnnotation, State,
726 };
727 let state = match resolution {
728 DiscussionResolution::Open => State::Open(Open {}),
729 DiscussionResolution::ResolvedIntoAnnotation { annotation_id } => {
730 State::IntoAnnotation(ResolvedIntoAnnotation {
731 annotation_id: annotation_id.clone(),
732 })
733 }
734 DiscussionResolution::ResolvedByEdit { state_id } => State::ByEdit(ResolvedByEdit {
735 state_id: state_id.as_bytes().to_vec(),
736 }),
737 DiscussionResolution::Dismissed { reason } => State::Dismissed(Dismissed {
738 reason: reason.clone(),
739 }),
740 };
741 grpc::heddle::v1::DiscussionResolution { state: Some(state) }
742}
743
744#[derive(Debug, Clone)]
752struct ChangedPath {
753 path: String,
754 kind: DiffKind,
755}
756
757impl ChangedPath {
758 fn kind_str(&self) -> &'static str {
759 match self.kind {
760 DiffKind::Added => "added",
761 DiffKind::Modified => "modified",
762 DiffKind::Deleted => "deleted",
763 DiffKind::Unchanged => "unchanged",
764 }
765 }
766}
767
768struct DiffSummary {
775 files_changed: u32,
776 added_files: u32,
777 modified_files: u32,
778 deleted_files: u32,
779 added_lines: u32,
780 removed_lines: u32,
781 changed_paths: Vec<ChangedPath>,
782}
783
784fn compute_state_diff_summary(
793 repo: &Repository,
794 state: &State,
795) -> objects::error::Result<DiffSummary> {
796 use objects::object::Tree;
797 let parent_tree_hash = if let Some(parent_id) = state.parents.first() {
798 match repo.store().get_state(parent_id)? {
799 Some(parent_state) => parent_state.tree,
800 None => Tree::new().hash(),
801 }
802 } else {
803 Tree::new().hash()
804 };
805
806 let parent_tree_obj = repo.store().get_tree(&parent_tree_hash)?;
812 let new_tree_obj = repo.store().get_tree(&state.tree)?;
813
814 let changes = if parent_tree_obj.is_some() && new_tree_obj.is_some() {
820 repo.diff_trees(&parent_tree_hash, &state.tree)?
821 } else {
822 objects::object::FileChangeSet::new()
823 };
824
825 let mut added_lines: u32 = 0;
832 let mut removed_lines: u32 = 0;
833 let mut changed_paths: Vec<ChangedPath> = Vec::with_capacity(changes.len());
834
835 let parent_files = match parent_tree_obj.as_ref() {
836 Some(t) => collect_files(repo, t, "")?,
837 None => std::collections::HashMap::new(),
838 };
839 let new_files = match new_tree_obj.as_ref() {
840 Some(t) => collect_files(repo, t, "")?,
841 None => std::collections::HashMap::new(),
842 };
843
844 let mut added_files: u32 = 0;
845 let mut modified_files: u32 = 0;
846 let mut deleted_files: u32 = 0;
847
848 for change in changes.iter() {
849 match change.kind {
850 DiffKind::Added => {
851 added_files += 1;
852 if let Some(hash) = new_files.get(&change.path)
859 && let Some(blob) = repo.store().get_blob(hash)?
860 {
861 added_lines = added_lines.saturating_add(line_count(blob.content()));
862 }
863 }
864 DiffKind::Deleted => {
865 deleted_files += 1;
866 if let Some(hash) = parent_files.get(&change.path)
867 && let Some(blob) = repo.store().get_blob(hash)?
868 {
869 removed_lines = removed_lines.saturating_add(line_count(blob.content()));
870 }
871 }
872 DiffKind::Modified => {
873 modified_files += 1;
874 let old_blob = match parent_files.get(&change.path) {
881 Some(h) => repo.store().get_blob(h)?,
882 None => None,
883 };
884 let new_blob = match new_files.get(&change.path) {
885 Some(h) => repo.store().get_blob(h)?,
886 None => None,
887 };
888 if let (Some(old), Some(new)) = (old_blob, new_blob) {
889 for line in diff_blobs(&old, &new) {
890 match line {
891 objects::worktree::DiffLine::Added(_) => {
892 added_lines = added_lines.saturating_add(1);
893 }
894 objects::worktree::DiffLine::Removed(_) => {
895 removed_lines = removed_lines.saturating_add(1);
896 }
897 objects::worktree::DiffLine::Context(_) => {}
898 }
899 }
900 }
901 }
902 DiffKind::Unchanged => continue,
903 }
904 changed_paths.push(ChangedPath {
905 path: change.path.clone(),
906 kind: change.kind,
907 });
908 }
909
910 Ok(DiffSummary {
911 files_changed: changed_paths.len() as u32,
912 added_files,
913 modified_files,
914 deleted_files,
915 added_lines,
916 removed_lines,
917 changed_paths,
918 })
919}
920
921fn line_count(content: &[u8]) -> u32 {
926 let Ok(s) = std::str::from_utf8(content) else {
927 return 0;
928 };
929 if s.is_empty() {
930 return 0;
931 }
932 let trimmed = s.strip_suffix('\n').unwrap_or(s);
933 if trimmed.is_empty() {
934 return 1;
935 }
936 (trimmed.matches('\n').count() as u32).saturating_add(1)
937}
938
939#[cfg(test)]
944mod tests {
945 use std::sync::Arc;
946
947 use crypto::Signer as _;
948 use grpc::heddle::v1::ReviewScope as ProtoReviewScope;
949 use repo::{Repository, operation_dedup::OperationDedupStore};
950 use tempfile::TempDir;
951
952 use super::*;
953
954 fn fresh_service() -> (LocalStateReviewService, Arc<Repository>, TempDir) {
955 let temp = TempDir::new().expect("create tempdir");
956 unsafe {
960 std::env::set_var("HEDDLE_PRINCIPAL_NAME", "Alice Tester");
961 std::env::set_var("HEDDLE_PRINCIPAL_EMAIL", "alice@example.com");
962 }
963 let repo = Repository::init_default(temp.path()).expect("init repo");
964 let dedup = OperationDedupStore::open(repo.heddle_dir()).expect("open dedup");
965 let repo = Arc::new(repo);
966 let svc =
967 LocalStateReviewService::new(GrpcLocalService::new(repo.clone(), Arc::new(dedup)));
968 (svc, repo, temp)
969 }
970
971 fn capture_state(repo: &Repository) -> ChangeId {
972 std::fs::write(repo.root().join("hello.txt"), b"hi").expect("write file");
974 let state = repo
975 .snapshot(Some("seed".to_string()), None)
976 .expect("snapshot");
977 state.change_id
978 }
979
980 fn sign_request(state_id: &ChangeId, op_id: &str) -> SignStateRequest {
981 let signer = crypto::Ed25519Signer::generate().expect("generate ed25519 key");
982 let scope = ReviewScope::WholeChange;
983 let signed_at = chrono::Utc::now().timestamp();
984 let payload = signing_payload(*state_id, ReviewKind::Read, &scope, signed_at, None);
985 let signature = signer.sign(&payload).expect("sign payload");
986 use grpc::heddle::v1::review_scope::{Scope, WholeChange};
987 SignStateRequest {
988 repo_path: String::new(),
989 state_id: state_id.as_bytes().to_vec(),
990 kind: grpc::heddle::v1::ReviewKind::Read as i32,
991 scope: Some(ProtoReviewScope {
992 scope: Some(Scope::WholeChange(WholeChange {})),
993 }),
994 justification: String::new(),
995 algorithm: "ed25519".to_string(),
996 public_key: signer.public_key().to_vec(),
997 signature: signature.clone(),
998 signed_at: Some(prost_types::Timestamp {
999 seconds: signed_at,
1000 nanos: 0,
1001 }),
1002 client_operation_id: op_id.to_string(),
1003 }
1004 }
1005
1006 #[tokio::test]
1007 #[serial_test::serial(process_global)]
1008 async fn sign_state_persists_to_review_signatures_blob() {
1009 let (svc, repo, _tmp) = fresh_service();
1010 let state_id = capture_state(&repo);
1011
1012 let resp = svc
1013 .sign_state(Request::new(sign_request(&state_id, "")))
1014 .await
1015 .expect("sign_state");
1016 assert!(!resp.get_ref().signature_id.is_empty());
1017 assert_eq!(resp.get_ref().state_id, state_id.as_bytes().to_vec());
1018
1019 let listing = svc
1020 .list_signatures(Request::new(ListSignaturesRequest {
1021 repo_path: String::new(),
1022 state_id: state_id.as_bytes().to_vec(),
1023 }))
1024 .await
1025 .expect("list_signatures");
1026 let sigs = &listing.get_ref().signatures;
1027 assert_eq!(sigs.len(), 1, "expected one signature, got {sigs:?}");
1028 assert_eq!(sigs[0].kind, grpc::heddle::v1::ReviewKind::Read as i32);
1029 assert_eq!(sigs[0].algorithm, "ed25519");
1030 assert_eq!(sigs[0].actor_name, "Alice Tester");
1031 assert_eq!(sigs[0].actor_email, "alice@example.com");
1032 let scope_case = sigs[0].scope.as_ref().and_then(|s| s.scope.as_ref());
1033 assert!(matches!(
1034 scope_case,
1035 Some(grpc::heddle::v1::review_scope::Scope::WholeChange(_))
1036 ));
1037 }
1038
1039 #[tokio::test]
1040 #[serial_test::serial(process_global)]
1041 async fn sign_state_idempotent() {
1042 let (svc, repo, _tmp) = fresh_service();
1043 let state_id = capture_state(&repo);
1044 let op_id = objects::object::OperationId::new().to_string();
1045 let req = sign_request(&state_id, &op_id);
1048
1049 let first = svc
1050 .sign_state(Request::new(req.clone()))
1051 .await
1052 .expect("first sign_state");
1053 let second = svc
1054 .sign_state(Request::new(req))
1055 .await
1056 .expect("second sign_state");
1057 assert_eq!(
1058 first.get_ref().signature_id,
1059 second.get_ref().signature_id,
1060 "replayed call must return the same signature_id"
1061 );
1062
1063 let listing = svc
1064 .list_signatures(Request::new(ListSignaturesRequest {
1065 repo_path: String::new(),
1066 state_id: state_id.as_bytes().to_vec(),
1067 }))
1068 .await
1069 .expect("list_signatures");
1070 assert_eq!(
1071 listing.get_ref().signatures.len(),
1072 1,
1073 "idempotent replay must not append a duplicate signature"
1074 );
1075 }
1076
1077 #[tokio::test]
1078 #[serial_test::serial(process_global)]
1079 async fn sign_state_rejects_forged_signature() {
1080 let (svc, repo, _tmp) = fresh_service();
1081 let state_id = capture_state(&repo);
1082 let mut req = sign_request(&state_id, "");
1083 let last = req.signature.len() - 1;
1085 req.signature[last] ^= 0xff;
1086
1087 let err = svc
1088 .sign_state(Request::new(req))
1089 .await
1090 .expect_err("forged signature must be rejected");
1091 assert_eq!(err.code(), tonic::Code::InvalidArgument, "{err:?}");
1092 assert!(
1093 err.message().contains("failed verification"),
1094 "unexpected error message: {}",
1095 err.message()
1096 );
1097 }
1098
1099 #[tokio::test]
1100 #[serial_test::serial(process_global)]
1101 async fn sign_state_rejects_skewed_timestamp() {
1102 let (svc, repo, _tmp) = fresh_service();
1103 let state_id = capture_state(&repo);
1104 let mut req = sign_request(&state_id, "");
1105 if let Some(ts) = req.signed_at.as_mut() {
1107 ts.seconds += 60 * 60;
1108 }
1109
1110 let err = svc
1111 .sign_state(Request::new(req))
1112 .await
1113 .expect_err("skewed timestamp must be rejected");
1114 assert_eq!(err.code(), tonic::Code::InvalidArgument);
1115 assert!(err.message().contains("too far from server time"));
1116 }
1117
1118 #[tokio::test]
1119 #[serial_test::serial(process_global)]
1120 async fn sign_state_attributes_to_caller_not_state_author() {
1121 let (svc, repo, _tmp) = fresh_service();
1127 let state_id = capture_state(&repo);
1131
1132 unsafe {
1135 std::env::set_var("HEDDLE_PRINCIPAL_NAME", "Bob Signer");
1136 std::env::set_var("HEDDLE_PRINCIPAL_EMAIL", "bob@example.com");
1137 }
1138
1139 svc.sign_state(Request::new(sign_request(&state_id, "")))
1140 .await
1141 .expect("sign_state");
1142
1143 let listing = svc
1144 .list_signatures(Request::new(ListSignaturesRequest {
1145 repo_path: String::new(),
1146 state_id: state_id.as_bytes().to_vec(),
1147 }))
1148 .await
1149 .expect("list_signatures");
1150 let sigs = &listing.get_ref().signatures;
1151 assert_eq!(sigs.len(), 1);
1152 assert_eq!(
1153 sigs[0].actor_name, "Bob Signer",
1154 "signature must attribute to the caller (Bob), not the state author (Alice)"
1155 );
1156 assert_eq!(sigs[0].actor_email, "bob@example.com");
1157
1158 unsafe {
1161 std::env::set_var("HEDDLE_PRINCIPAL_NAME", "Alice Tester");
1162 std::env::set_var("HEDDLE_PRINCIPAL_EMAIL", "alice@example.com");
1163 }
1164 }
1165
1166 #[tokio::test]
1167 #[serial_test::serial(process_global)]
1168 async fn sign_state_serializes_concurrent_appends() {
1169 let (svc, repo, _tmp) = fresh_service();
1176 let state_id = capture_state(&repo);
1177
1178 let op_a = objects::object::OperationId::new().to_string();
1180 let op_b = objects::object::OperationId::new().to_string();
1181 let req_a = sign_request(&state_id, &op_a);
1182 let req_b = sign_request(&state_id, &op_b);
1183
1184 let svc_a = svc.clone();
1185 let svc_b = svc.clone();
1186 let (a, b) = tokio::join!(
1187 svc_a.sign_state(Request::new(req_a)),
1188 svc_b.sign_state(Request::new(req_b)),
1189 );
1190 a.expect("first sign_state");
1191 b.expect("second sign_state");
1192
1193 let listing = svc
1194 .list_signatures(Request::new(ListSignaturesRequest {
1195 repo_path: String::new(),
1196 state_id: state_id.as_bytes().to_vec(),
1197 }))
1198 .await
1199 .expect("list_signatures");
1200 assert_eq!(
1201 listing.get_ref().signatures.len(),
1202 2,
1203 "both concurrent signatures must land — neither should be lost \
1204 to a stale-blob clobber"
1205 );
1206 }
1207
1208 #[tokio::test]
1216 #[serial_test::serial(process_global)]
1217 async fn get_review_payload_populates_diff_summary_and_signals() {
1218 let (svc, repo, _tmp) = fresh_service();
1219
1220 std::fs::write(repo.root().join("hello.txt"), b"first\nsecond\nthird\n")
1223 .expect("write hello.txt");
1224 let first = repo
1225 .snapshot(Some("first capture".to_string()), None)
1226 .expect("first snapshot");
1227
1228 let resp_first = svc
1229 .get_review_payload(Request::new(GetReviewPayloadRequest {
1230 repo_path: String::new(),
1231 state_id: first.change_id.as_bytes().to_vec(),
1232 include_all_signals: false,
1233 }))
1234 .await
1235 .expect("get_review_payload first");
1236 let payload_first = resp_first.into_inner();
1237 let summary_first = payload_first.summary.as_ref().expect("summary present");
1238 assert!(
1239 summary_first.files_changed >= 1,
1240 "first state should report at least one file changed (vs empty parent), got {}",
1241 summary_first.files_changed
1242 );
1243 assert!(
1244 summary_first.added_lines >= 3,
1245 "first state should report 3+ added lines, got {}",
1246 summary_first.added_lines
1247 );
1248 assert!(
1249 !payload_first.in_budget_signals.is_empty(),
1250 "in_budget_signals must include a diff_summary entry"
1251 );
1252 let first_signal = &payload_first.in_budget_signals[0];
1253 assert!(
1254 first_signal.kind.starts_with("diff_summary."),
1255 "expected synthetic diff_summary signal kind, got {}",
1256 first_signal.kind
1257 );
1258 assert_eq!(first_signal.producer_module, "review_show.diff_summary");
1259 assert_eq!(first_signal.visibility, "visible");
1260
1261 std::fs::write(
1265 repo.root().join("hello.txt"),
1266 b"first\nsecond\nthird\nfourth\n",
1267 )
1268 .expect("modify hello.txt");
1269 let second = repo
1270 .snapshot(Some("second capture".to_string()), None)
1271 .expect("second snapshot");
1272
1273 let resp_second = svc
1274 .get_review_payload(Request::new(GetReviewPayloadRequest {
1275 repo_path: String::new(),
1276 state_id: second.change_id.as_bytes().to_vec(),
1277 include_all_signals: false,
1278 }))
1279 .await
1280 .expect("get_review_payload second");
1281 let payload_second = resp_second.into_inner();
1282 let summary_second = payload_second.summary.as_ref().expect("summary present");
1283 assert_eq!(
1284 summary_second.files_changed, 1,
1285 "second state should report exactly one modified file"
1286 );
1287 assert!(
1288 summary_second.added_lines >= 1,
1289 "second state should report at least one added line, got {}",
1290 summary_second.added_lines
1291 );
1292 assert!(
1293 !payload_second.in_budget_signals.is_empty(),
1294 "in_budget_signals must include a diff_summary entry"
1295 );
1296 let signal = &payload_second.in_budget_signals[0];
1297 assert_eq!(
1298 signal
1299 .anchor
1300 .as_ref()
1301 .map(|a| a.file.as_str())
1302 .unwrap_or(""),
1303 "hello.txt",
1304 "diff_summary signal should anchor on the modified file"
1305 );
1306 assert!(
1307 signal.reason.contains("files changed"),
1308 "first signal reason should carry the aggregate summary, got {}",
1309 signal.reason
1310 );
1311 assert_eq!(
1315 summary_second.in_budget_signal_count,
1316 payload_second.in_budget_signals.len() as u32,
1317 "in_budget_signal_count must match the array length"
1318 );
1319 }
1320
1321 #[tokio::test]
1333 #[serial_test::serial(process_global)]
1334 async fn get_review_payload_tolerates_missing_tree() {
1335 let (svc, repo, _tmp) = fresh_service();
1336 let state_id = capture_state(&repo);
1337
1338 let state = repo
1343 .store()
1344 .get_state(&state_id)
1345 .expect("get state")
1346 .expect("state present");
1347 let bogus_tree = objects::object::ContentHash::compute(b"definitely-not-in-store-bytes");
1348 let mut mutated = state.clone();
1349 mutated.tree = bogus_tree;
1350 repo.store().put_state(&mutated).expect("put mutated state");
1351
1352 let resp = svc
1355 .get_review_payload(Request::new(GetReviewPayloadRequest {
1356 repo_path: String::new(),
1357 state_id: state_id.as_bytes().to_vec(),
1358 include_all_signals: false,
1359 }))
1360 .await
1361 .expect("missing tree must not block review payload");
1362 let payload = resp.into_inner();
1363 let summary = payload.summary.as_ref().expect("summary present");
1364 assert_eq!(
1365 summary.files_changed, 0,
1366 "missing tree must produce a zero-change summary, got {} files",
1367 summary.files_changed
1368 );
1369 assert!(
1373 !payload.in_budget_signals.is_empty(),
1374 "in_budget_signals should always contain at least the synthetic diff_summary entry"
1375 );
1376 let kind = &payload.in_budget_signals[0].kind;
1377 assert!(
1378 kind.starts_with("diff_summary."),
1379 "expected synthetic diff_summary signal, got {kind}"
1380 );
1381 }
1382
1383 #[test]
1387 fn line_count_matches_git_semantics() {
1388 assert_eq!(line_count(b""), 0);
1389 assert_eq!(line_count(b"\n"), 1);
1390 assert_eq!(line_count(b"hello"), 1);
1391 assert_eq!(line_count(b"hello\n"), 1);
1392 assert_eq!(line_count(b"hello\nworld"), 2);
1393 assert_eq!(line_count(b"hello\nworld\n"), 2);
1394 assert_eq!(line_count(b"a\nb\nc\n"), 3);
1395 assert_eq!(line_count(&[0xff, 0xfe, 0xfd]), 0);
1397 }
1398}