1use ::state_review::{
12 PathSymbol, ReadingOrderPartition, SymbolKind, build_review_payload_partition,
13};
14use crypto::verify_payload_signature;
15use grpc::heddle::v1::{
16 AnchoredDiscussion, GetReviewPayloadRequest, ListSignaturesRequest, ListSignaturesResponse,
17 MergeRequirement, PathSymbolRef as ProtoPathSymbolRef,
18 ReadingOrderPartition as ProtoReadingOrderPartition, ReviewPayload,
19 ReviewScope as ProtoReviewScope, ReviewSignature as ProtoReviewSignature, ReviewSummary,
20 RiskSignal as ProtoRiskSignal, SignStateRequest, SignStateResponse,
21 SignalAnchor as ProtoSignalAnchor, SigningFooter,
22 state_review_service_server::StateReviewService,
23};
24use objects::{
25 lock::RepositoryLockExt,
26 object::{
27 Blob, ChangeId, DiffKind, Discussion, DiscussionResolution, DiscussionsBlob, ReviewKind,
28 ReviewScope, ReviewSignature, ReviewSignaturesBlob, RiskSignalBlob, State, SymbolAnchor,
29 signing_payload,
30 },
31 worktree::diff_blobs,
32};
33use prost::Message;
34use repo::Repository;
35use tonic::{Request, Response, Status};
36
37use super::{GrpcLocalService, to_status, with_idempotency};
38
39const SIGN_TIMESTAMP_SKEW_SECS: i64 = 5 * 60;
43
44#[derive(Clone)]
46pub struct LocalStateReviewService {
47 inner: GrpcLocalService,
48}
49
50impl LocalStateReviewService {
51 pub fn new(inner: GrpcLocalService) -> Self {
52 Self { inner }
53 }
54}
55
56#[tonic::async_trait]
57impl StateReviewService for LocalStateReviewService {
58 async fn get_review_payload(
59 &self,
60 request: Request<GetReviewPayloadRequest>,
61 ) -> Result<Response<ReviewPayload>, Status> {
62 let req = request.into_inner();
63 let change_id = parse_change_id(&req.state_id)?;
64 let repo = self.inner.repo();
65 let state = repo
66 .store()
67 .get_state(&change_id)
68 .map_err(to_status)?
69 .ok_or_else(|| {
70 Status::not_found(format!("state {} not found", change_id.to_string_full()))
71 })?;
72
73 let diff_summary = compute_state_diff_summary(repo, &state).map_err(to_status)?;
79
80 let summary = ReviewSummary {
81 headline: state.intent.clone().unwrap_or_default(),
82 files_changed: diff_summary.files_changed,
83 added_lines: diff_summary.added_lines,
84 removed_lines: diff_summary.removed_lines,
85 in_budget_signal_count: 0,
86 hidden_signal_count: 0,
87 };
88
89 let agent_narrative = if state.attribution.agent.is_some() {
90 state.intent.clone().unwrap_or_default()
91 } else {
92 String::new()
93 };
94
95 let mut all_signals: Vec<ProtoRiskSignal> = Vec::new();
99 if req.include_all_signals
100 && let Some(hash) = state.risk_signals
101 && let Some(blob) = repo.store().get_blob(&hash).map_err(to_status)?
102 {
103 let decoded = RiskSignalBlob::decode(blob.content())
104 .map_err(|err| Status::internal(format!("decode risk signals: {err}")))?;
105 all_signals = decoded
106 .signals
107 .into_iter()
108 .map(|s| risk_signal_to_proto(s, "visible"))
109 .collect();
110 }
111
112 let mut in_budget_signals: Vec<ProtoRiskSignal> = Vec::new();
121 let summary_kind = match (
122 diff_summary.added_files,
123 diff_summary.modified_files,
124 diff_summary.deleted_files,
125 ) {
126 (a, 0, 0) if a > 0 => "diff_summary.added_only",
127 (0, m, 0) if m > 0 => "diff_summary.modified_only",
128 (0, 0, d) if d > 0 => "diff_summary.deleted_only",
129 (0, 0, 0) => "diff_summary.empty",
130 _ => "diff_summary.mixed",
131 };
132 let summary_reason = format!(
133 "{} files changed (+{}/-{}, {} added, {} modified, {} deleted)",
134 diff_summary.files_changed,
135 diff_summary.added_lines,
136 diff_summary.removed_lines,
137 diff_summary.added_files,
138 diff_summary.modified_files,
139 diff_summary.deleted_files,
140 );
141 const MAX_DIFF_SIGNAL_ANCHORS: usize = 32;
146 if diff_summary.changed_paths.is_empty() {
147 in_budget_signals.push(ProtoRiskSignal {
148 kind: summary_kind.to_string(),
149 anchor: Some(ProtoSignalAnchor {
150 file: String::new(),
151 symbol: String::new(),
152 start_line: 0,
153 end_line: 0,
154 }),
155 reason: summary_reason.clone(),
156 producer_module: "review_show.diff_summary".to_string(),
157 producer_version: 1,
158 computed_at: None,
159 visibility: "visible".to_string(),
160 });
161 } else {
162 for (idx, path_kind) in diff_summary
163 .changed_paths
164 .iter()
165 .take(MAX_DIFF_SIGNAL_ANCHORS)
166 .enumerate()
167 {
168 let reason = if idx == 0 {
169 summary_reason.clone()
170 } else {
171 format!("{} ({})", path_kind.path, path_kind.kind_str())
172 };
173 in_budget_signals.push(ProtoRiskSignal {
174 kind: summary_kind.to_string(),
175 anchor: Some(ProtoSignalAnchor {
176 file: path_kind.path.clone(),
177 symbol: String::new(),
178 start_line: 0,
179 end_line: 0,
180 }),
181 reason,
182 producer_module: "review_show.diff_summary".to_string(),
183 producer_version: 1,
184 computed_at: None,
185 visibility: "visible".to_string(),
186 });
187 }
188 }
189
190 let symbols = changed_files_as_symbols(repo, &state).map_err(to_status)?;
194 let partition = build_review_payload_partition(&symbols);
195
196 let discussions = match state.discussions {
199 Some(hash) => {
200 let blob = repo
201 .store()
202 .get_blob(&hash)
203 .map_err(to_status)?
204 .ok_or_else(|| {
205 Status::internal(format!(
206 "discussions blob {} referenced by state {} is missing",
207 hash,
208 state.change_id.to_string_full()
209 ))
210 })?;
211 let decoded = DiscussionsBlob::decode(blob.content())
212 .map_err(|err| Status::internal(format!("decode discussions: {err}")))?;
213 decoded
214 .discussions
215 .iter()
216 .map(discussion_to_anchored_proto)
217 .collect()
218 }
219 None => Vec::<AnchoredDiscussion>::new(),
220 };
221
222 let mut summary = summary;
223 summary.in_budget_signal_count = in_budget_signals.len() as u32;
224 summary.hidden_signal_count =
225 all_signals.len().saturating_sub(in_budget_signals.len()) as u32;
226
227 let payload = ReviewPayload {
228 state_id: req.state_id.clone(),
229 summary: Some(summary),
230 agent_narrative,
231 partition: Some(partition_to_proto(partition)),
232 in_budget_signals,
233 all_signals,
234 tick_budget: 3,
235 discussions,
236 merge_requirements: Vec::<MergeRequirement>::new(),
239 signing_footer: Some(SigningFooter {
240 available_kinds: vec![
241 grpc::heddle::v1::ReviewKind::Read as i32,
242 grpc::heddle::v1::ReviewKind::AgentPreview as i32,
243 grpc::heddle::v1::ReviewKind::AgentCoReview as i32,
244 ],
245 }),
246 };
247
248 Ok(Response::new(payload))
249 }
250
251 async fn sign_state(
252 &self,
253 request: Request<SignStateRequest>,
254 ) -> Result<Response<SignStateResponse>, Status> {
255 let req = request.into_inner();
256 let req_bytes = req.encode_to_vec();
257 let client_operation_id = req.client_operation_id.clone();
258 let inner = self.inner.clone();
259
260 let response = with_idempotency(
261 self.inner.dedup(),
262 &client_operation_id,
263 "state_review.sign_state",
264 &req_bytes,
265 |resp: &SignStateResponse| resp.encode_to_vec(),
266 |bytes| {
267 SignStateResponse::decode(&bytes[..]).map_err(|e| Status::internal(e.to_string()))
268 },
269 move || {
270 let inner = inner.clone();
271 async move { execute_sign_state(&inner, req).await }
272 },
273 )
274 .await?;
275
276 Ok(Response::new(response))
277 }
278
279 async fn list_signatures(
280 &self,
281 request: Request<ListSignaturesRequest>,
282 ) -> Result<Response<ListSignaturesResponse>, Status> {
283 let req = request.into_inner();
284 let change_id = parse_change_id(&req.state_id)?;
285 let repo = self.inner.repo();
286 let state = repo
287 .store()
288 .get_state(&change_id)
289 .map_err(to_status)?
290 .ok_or_else(|| {
291 Status::not_found(format!("state {} not found", change_id.to_string_full()))
292 })?;
293
294 let signatures = match state.review_signatures {
295 Some(hash) => {
296 let blob = repo
297 .store()
298 .get_blob(&hash)
299 .map_err(to_status)?
300 .ok_or_else(|| {
301 Status::internal(format!(
302 "review signatures blob {} missing from object store",
303 hash
304 ))
305 })?;
306 let decoded = ReviewSignaturesBlob::decode(blob.content())
307 .map_err(|err| Status::internal(format!("decode review signatures: {err}")))?;
308 decoded
309 .signatures
310 .into_iter()
311 .enumerate()
312 .map(|(idx, sig)| review_signature_to_proto(sig, synthetic_signature_id(idx)))
313 .collect()
314 }
315 None => Vec::new(),
316 };
317
318 Ok(Response::new(ListSignaturesResponse { signatures }))
319 }
320}
321
322async fn execute_sign_state(
325 inner: &GrpcLocalService,
326 req: SignStateRequest,
327) -> Result<SignStateResponse, Status> {
328 let kind = match grpc::heddle::v1::ReviewKind::try_from(req.kind)
330 .map_err(|_| Status::invalid_argument(format!("unknown review kind tag {}", req.kind)))?
331 {
332 grpc::heddle::v1::ReviewKind::Read => ReviewKind::Read,
333 grpc::heddle::v1::ReviewKind::AgentPreview => ReviewKind::AgentPreview,
334 grpc::heddle::v1::ReviewKind::AgentCoReview => ReviewKind::AgentCoReview,
335 grpc::heddle::v1::ReviewKind::Unspecified => {
336 return Err(Status::invalid_argument("review kind is required"));
337 }
338 };
339
340 let change_id = parse_change_id(&req.state_id)?;
342 let repo = inner.repo();
343
344 let scope = match req.scope.as_ref() {
346 Some(s) => proto_scope_to_object(s)?,
347 None => ReviewScope::WholeChange,
348 };
349
350 let actor = repo
358 .get_principal()
359 .map_err(|err| Status::internal(format!("resolve caller principal: {err}")))?;
360 let justification = if req.justification.is_empty() {
361 None
362 } else {
363 Some(req.justification.clone())
364 };
365
366 let now = chrono::Utc::now().timestamp();
367 let signed_at = req.signed_at.as_ref().map(|t| t.seconds).unwrap_or(0);
368 if signed_at == 0 {
369 return Err(Status::invalid_argument(
370 "signed_at is required and must match the timestamp the client signed over",
371 ));
372 }
373 if (signed_at - now).abs() > SIGN_TIMESTAMP_SKEW_SECS {
374 return Err(Status::invalid_argument(format!(
375 "signed_at={signed_at} is too far from server time={now} (max skew {SIGN_TIMESTAMP_SKEW_SECS}s)"
376 )));
377 }
378
379 let new_sig = ReviewSignature {
380 actor,
381 kind,
382 scope: scope.clone(),
383 justification: justification.clone(),
384 signed_at,
385 algorithm: req.algorithm.clone(),
386 public_key: hex::encode(&req.public_key),
387 signature: hex::encode(&req.signature),
388 };
389 new_sig
390 .validate()
391 .map_err(|err| Status::invalid_argument(format!("invalid review signature: {err}")))?;
392
393 let public_key_bytes = req.public_key.clone();
394 let signature_bytes = req.signature.clone();
395 let payload = signing_payload(change_id, kind, &scope, signed_at, justification.as_deref());
396 verify_payload_signature(
397 &payload,
398 &req.algorithm,
399 &public_key_bytes,
400 &signature_bytes,
401 )
402 .map_err(|err| {
403 Status::invalid_argument(format!(
404 "review signature failed verification ({}): {err}",
405 req.algorithm
406 ))
407 })?;
408
409 let _lock = repo
415 .locker()
416 .write()
417 .map_err(|err| Status::internal(err.to_string()))?;
418 let state = repo
419 .store()
420 .get_state(&change_id)
421 .map_err(to_status)?
422 .ok_or_else(|| {
423 Status::not_found(format!("state {} not found", change_id.to_string_full()))
424 })?;
425 let mut blob = match state.review_signatures {
426 Some(hash) => {
427 let raw = repo
428 .store()
429 .get_blob(&hash)
430 .map_err(to_status)?
431 .ok_or_else(|| {
432 Status::internal(format!(
433 "existing review signatures blob {} missing from object store",
434 hash
435 ))
436 })?;
437 ReviewSignaturesBlob::decode(raw.content())
438 .map_err(|err| Status::internal(format!("decode review signatures: {err}")))?
439 }
440 None => ReviewSignaturesBlob::new(Vec::new()),
441 };
442 blob.signatures.push(new_sig);
443 let new_index = blob.signatures.len() - 1;
444
445 let bytes = blob
447 .encode()
448 .map_err(|err| Status::internal(format!("encode review signatures: {err}")))?;
449 let content_hash = repo
450 .store()
451 .put_blob(&Blob::new(bytes))
452 .map_err(to_status)?;
453
454 let new_state = state.with_review_signatures(content_hash);
456 repo.store().put_state(&new_state).map_err(to_status)?;
457
458 Ok(SignStateResponse {
459 signature_id: synthetic_signature_id(new_index),
460 state_id: req.state_id,
461 })
462}
463
464fn synthetic_signature_id(index: usize) -> String {
468 format!("rs-{index}")
469}
470
471fn parse_change_id(s: &[u8]) -> Result<ChangeId, Status> {
472 ChangeId::try_from_slice(s)
473 .map_err(|err| Status::invalid_argument(format!("invalid state_id: {err}")))
474}
475
476fn proto_scope_to_object(scope: &ProtoReviewScope) -> Result<ReviewScope, Status> {
477 use grpc::heddle::v1::review_scope::Scope;
478 match scope.scope.as_ref() {
479 None | Some(Scope::WholeChange(_)) => Ok(ReviewScope::WholeChange),
480 Some(Scope::Symbols(list)) => {
481 if list.symbols.is_empty() {
482 return Err(Status::invalid_argument(
483 "symbols scope requires at least one symbol anchor",
484 ));
485 }
486 let symbols = list
487 .symbols
488 .iter()
489 .map(|s| SymbolAnchor::new(s.file.clone(), s.symbol.clone()))
490 .collect();
491 Ok(ReviewScope::Symbols(symbols))
492 }
493 }
494}
495
496fn object_scope_to_proto(scope: &ReviewScope) -> ProtoReviewScope {
497 use grpc::heddle::v1::review_scope::{Scope, SymbolList, WholeChange};
498 let inner = match scope {
499 ReviewScope::WholeChange => Scope::WholeChange(WholeChange {}),
500 ReviewScope::Symbols(symbols) => Scope::Symbols(SymbolList {
501 symbols: symbols
502 .iter()
503 .map(|s| ProtoPathSymbolRef {
504 file: s.file.clone(),
505 symbol: s.symbol.clone(),
506 })
507 .collect(),
508 }),
509 };
510 ProtoReviewScope { scope: Some(inner) }
511}
512
513fn review_signature_to_proto(sig: ReviewSignature, signature_id: String) -> ProtoReviewSignature {
514 ProtoReviewSignature {
515 signature_id,
516 actor_name: sig.actor.name.clone(),
517 actor_email: sig.actor.email.clone(),
518 kind: review_kind_to_proto(sig.kind) as i32,
519 scope: Some(object_scope_to_proto(&sig.scope)),
520 justification: sig.justification.unwrap_or_default(),
521 signed_at: Some(prost_types::Timestamp {
522 seconds: sig.signed_at,
523 nanos: 0,
524 }),
525 algorithm: sig.algorithm,
526 public_key: hex::decode(&sig.public_key).unwrap_or_default(),
527 signature: hex::decode(&sig.signature).unwrap_or_default(),
528 }
529}
530
531fn review_kind_to_proto(kind: ReviewKind) -> grpc::heddle::v1::ReviewKind {
532 match kind {
533 ReviewKind::Read => grpc::heddle::v1::ReviewKind::Read,
534 ReviewKind::AgentPreview => grpc::heddle::v1::ReviewKind::AgentPreview,
535 ReviewKind::AgentCoReview => grpc::heddle::v1::ReviewKind::AgentCoReview,
536 }
537}
538
539fn risk_signal_to_proto(sig: objects::object::RiskSignal, visibility: &str) -> ProtoRiskSignal {
540 let (start_line, end_line) = sig.anchor.line_range.unwrap_or((0, 0));
541 ProtoRiskSignal {
542 kind: sig.kind.as_str().to_string(),
543 anchor: Some(ProtoSignalAnchor {
544 file: sig.anchor.file,
545 symbol: sig.anchor.symbol.unwrap_or_default(),
546 start_line,
547 end_line,
548 }),
549 reason: sig.reason,
550 producer_module: sig.producer.module,
551 producer_version: sig.producer.version,
552 computed_at: Some(prost_types::Timestamp {
553 seconds: sig.computed_at,
554 nanos: 0,
555 }),
556 visibility: visibility.to_string(),
557 }
558}
559
560fn changed_files_as_symbols(
570 repo: &Repository,
571 state: &State,
572) -> objects::error::Result<Vec<PathSymbol>> {
573 let new_tree = match repo.store().get_tree(&state.tree)? {
574 Some(t) => t,
575 None => return Ok(Vec::new()),
576 };
577 let parent_tree = if let Some(parent_id) = state.parents.first() {
578 repo.store()
579 .get_state(parent_id)?
580 .and_then(|p| repo.store().get_tree(&p.tree).ok().flatten())
581 } else {
582 None
583 };
584 let new_files = collect_files(repo, &new_tree, "")?;
585 let parent_files = match parent_tree {
586 Some(t) => collect_files(repo, &t, "")?,
587 None => std::collections::HashMap::new(),
588 };
589
590 let mut out: Vec<PathSymbol> = Vec::new();
591 for (path, hash) in &new_files {
592 let changed = parent_files.get(path).map(|h| h != hash).unwrap_or(true);
593 if !changed {
594 continue;
595 }
596 #[cfg_attr(not(feature = "semantic"), allow(unused_mut))]
597 let mut emitted_any = false;
598 #[cfg(feature = "semantic")]
599 {
600 if let Some(blob) = repo.store().get_blob(hash)? {
601 emitted_any = extract_file_symbols(path, blob.content(), &mut out);
602 }
603 }
604 let _ = hash;
605 if !emitted_any {
606 out.push(PathSymbol {
607 file: path.clone(),
608 symbol: path.clone(),
609 kind: SymbolKind::Other,
610 });
611 }
612 }
613 Ok(out)
614}
615
616#[cfg(feature = "semantic")]
617fn extract_file_symbols(path: &str, source: &[u8], out: &mut Vec<PathSymbol>) -> bool {
618 use ::semantic::symbol_resolver::{Definition, DefinitionKind, extract_definitions};
619 let definitions: Vec<Definition> = match extract_definitions(source, std::path::Path::new(path))
620 {
621 Ok(defs) => defs,
622 Err(_) => return false,
623 };
624 if definitions.is_empty() {
625 return false;
626 }
627 for d in definitions {
628 let kind = match d.kind {
629 DefinitionKind::Type => SymbolKind::Type,
630 DefinitionKind::Trait => SymbolKind::Trait,
631 DefinitionKind::Class => SymbolKind::Class,
632 DefinitionKind::Interface => SymbolKind::Interface,
633 DefinitionKind::TypeAlias => SymbolKind::TypeAlias,
634 DefinitionKind::EnumDef => SymbolKind::EnumDef,
635 DefinitionKind::ConstDecl => SymbolKind::ConstDecl,
636 DefinitionKind::Module => SymbolKind::Module,
637 DefinitionKind::Function => SymbolKind::Function,
638 DefinitionKind::Other => SymbolKind::Other,
639 };
640 let symbol = match d.parent_name.as_deref() {
641 Some(parent) if !parent.is_empty() => format!("{parent}::{}", d.name),
642 _ => d.name,
643 };
644 out.push(PathSymbol {
645 file: path.to_string(),
646 symbol,
647 kind,
648 });
649 }
650 true
651}
652
653fn collect_files(
654 repo: &Repository,
655 tree: &objects::object::Tree,
656 prefix: &str,
657) -> objects::error::Result<std::collections::HashMap<String, objects::object::ContentHash>> {
658 let mut out = std::collections::HashMap::new();
659 for entry in tree.entries() {
660 let path = if prefix.is_empty() {
661 entry.name.clone()
662 } else {
663 format!("{prefix}/{}", entry.name)
664 };
665 if entry.is_tree() {
666 if let Some(subtree) = repo.store().get_tree(&entry.hash)? {
667 let sub = collect_files(repo, &subtree, &path)?;
668 out.extend(sub);
669 }
670 } else {
671 out.insert(path, entry.hash);
672 }
673 }
674 Ok(out)
675}
676
677fn partition_to_proto(p: ReadingOrderPartition) -> ProtoReadingOrderPartition {
678 ProtoReadingOrderPartition {
679 structural: p.structural.iter().map(path_symbol_to_proto).collect(),
680 consequence: p.consequence.iter().map(path_symbol_to_proto).collect(),
681 tests_and_docs: p.tests_and_docs.iter().map(path_symbol_to_proto).collect(),
682 }
683}
684
685fn path_symbol_to_proto(p: &PathSymbol) -> ProtoPathSymbolRef {
686 ProtoPathSymbolRef {
687 file: p.file.clone(),
688 symbol: p.symbol.clone(),
689 }
690}
691
692fn discussion_to_anchored_proto(d: &Discussion) -> AnchoredDiscussion {
693 AnchoredDiscussion {
694 id: d.id.clone(),
695 anchor: Some(ProtoPathSymbolRef {
696 file: d.anchor.file.clone(),
697 symbol: d.anchor.symbol.clone(),
698 }),
699 opened_against_state: d.opened_against_state.as_bytes().to_vec(),
700 opened_at: Some(prost_types::Timestamp {
701 seconds: d.opened_at,
702 nanos: 0,
703 }),
704 turns: d
705 .turns
706 .iter()
707 .map(|t| grpc::heddle::v1::DiscussionTurn {
708 author_name: t.author.name.clone(),
709 author_email: t.author.email.clone(),
710 body: t.body.clone(),
711 posted_at: Some(prost_types::Timestamp {
712 seconds: t.posted_at,
713 nanos: 0,
714 }),
715 })
716 .collect(),
717 resolution: Some(discussion_resolution_to_proto(&d.resolution)),
718 body_changed_since_open: d.body_changed_since_open,
719 orphaned: d.orphaned,
720 visibility: d.visibility.as_str().to_string(),
721 }
722}
723
724fn discussion_resolution_to_proto(
725 resolution: &DiscussionResolution,
726) -> grpc::heddle::v1::DiscussionResolution {
727 use grpc::heddle::v1::discussion_resolution::{
728 Dismissed, Open, ResolvedByEdit, ResolvedIntoAnnotation, State,
729 };
730 let state = match resolution {
731 DiscussionResolution::Open => State::Open(Open {}),
732 DiscussionResolution::ResolvedIntoAnnotation { annotation_id } => {
733 State::IntoAnnotation(ResolvedIntoAnnotation {
734 annotation_id: annotation_id.clone(),
735 })
736 }
737 DiscussionResolution::ResolvedByEdit { state_id } => State::ByEdit(ResolvedByEdit {
738 state_id: state_id.as_bytes().to_vec(),
739 }),
740 DiscussionResolution::Dismissed { reason } => State::Dismissed(Dismissed {
741 reason: reason.clone(),
742 }),
743 };
744 grpc::heddle::v1::DiscussionResolution { state: Some(state) }
745}
746
747#[derive(Debug, Clone)]
755struct ChangedPath {
756 path: String,
757 kind: DiffKind,
758}
759
760impl ChangedPath {
761 fn kind_str(&self) -> &'static str {
762 match self.kind {
763 DiffKind::Added => "added",
764 DiffKind::Modified => "modified",
765 DiffKind::Deleted => "deleted",
766 DiffKind::Unchanged => "unchanged",
767 }
768 }
769}
770
771struct DiffSummary {
778 files_changed: u32,
779 added_files: u32,
780 modified_files: u32,
781 deleted_files: u32,
782 added_lines: u32,
783 removed_lines: u32,
784 changed_paths: Vec<ChangedPath>,
785}
786
787fn compute_state_diff_summary(
796 repo: &Repository,
797 state: &State,
798) -> objects::error::Result<DiffSummary> {
799 use objects::object::Tree;
800 let parent_tree_hash = if let Some(parent_id) = state.parents.first() {
801 match repo.store().get_state(parent_id)? {
802 Some(parent_state) => parent_state.tree,
803 None => Tree::new().hash(),
804 }
805 } else {
806 Tree::new().hash()
807 };
808
809 let parent_tree_obj = repo.store().get_tree(&parent_tree_hash)?;
815 let new_tree_obj = repo.store().get_tree(&state.tree)?;
816
817 let changes = if parent_tree_obj.is_some() && new_tree_obj.is_some() {
823 repo.diff_trees(&parent_tree_hash, &state.tree)?
824 } else {
825 objects::object::FileChangeSet::new()
826 };
827
828 let mut added_lines: u32 = 0;
835 let mut removed_lines: u32 = 0;
836 let mut changed_paths: Vec<ChangedPath> = Vec::with_capacity(changes.len());
837
838 let parent_files = match parent_tree_obj.as_ref() {
839 Some(t) => collect_files(repo, t, "")?,
840 None => std::collections::HashMap::new(),
841 };
842 let new_files = match new_tree_obj.as_ref() {
843 Some(t) => collect_files(repo, t, "")?,
844 None => std::collections::HashMap::new(),
845 };
846
847 let mut added_files: u32 = 0;
848 let mut modified_files: u32 = 0;
849 let mut deleted_files: u32 = 0;
850
851 for change in changes.iter() {
852 match change.kind {
853 DiffKind::Added => {
854 added_files += 1;
855 if let Some(hash) = new_files.get(&change.path)
862 && let Some(blob) = repo.store().get_blob(hash)?
863 {
864 added_lines = added_lines.saturating_add(line_count(blob.content()));
865 }
866 }
867 DiffKind::Deleted => {
868 deleted_files += 1;
869 if let Some(hash) = parent_files.get(&change.path)
870 && let Some(blob) = repo.store().get_blob(hash)?
871 {
872 removed_lines = removed_lines.saturating_add(line_count(blob.content()));
873 }
874 }
875 DiffKind::Modified => {
876 modified_files += 1;
877 let old_blob = match parent_files.get(&change.path) {
884 Some(h) => repo.store().get_blob(h)?,
885 None => None,
886 };
887 let new_blob = match new_files.get(&change.path) {
888 Some(h) => repo.store().get_blob(h)?,
889 None => None,
890 };
891 if let (Some(old), Some(new)) = (old_blob, new_blob) {
892 for line in diff_blobs(&old, &new) {
893 match line {
894 objects::worktree::DiffLine::Added(_) => {
895 added_lines = added_lines.saturating_add(1);
896 }
897 objects::worktree::DiffLine::Removed(_) => {
898 removed_lines = removed_lines.saturating_add(1);
899 }
900 objects::worktree::DiffLine::Context(_) => {}
901 }
902 }
903 }
904 }
905 DiffKind::Unchanged => continue,
906 }
907 changed_paths.push(ChangedPath {
908 path: change.path.clone(),
909 kind: change.kind,
910 });
911 }
912
913 Ok(DiffSummary {
914 files_changed: changed_paths.len() as u32,
915 added_files,
916 modified_files,
917 deleted_files,
918 added_lines,
919 removed_lines,
920 changed_paths,
921 })
922}
923
924fn line_count(content: &[u8]) -> u32 {
929 let Ok(s) = std::str::from_utf8(content) else {
930 return 0;
931 };
932 if s.is_empty() {
933 return 0;
934 }
935 let trimmed = s.strip_suffix('\n').unwrap_or(s);
936 if trimmed.is_empty() {
937 return 1;
938 }
939 (trimmed.matches('\n').count() as u32).saturating_add(1)
940}
941
942#[cfg(test)]
947mod tests {
948 use std::sync::Arc;
949
950 use crypto::Signer as _;
951 use grpc::heddle::v1::ReviewScope as ProtoReviewScope;
952 use repo::{Repository, operation_dedup::OperationDedupStore};
953 use tempfile::TempDir;
954
955 use super::*;
956
957 fn fresh_service() -> (LocalStateReviewService, Arc<Repository>, TempDir) {
958 let temp = TempDir::new().expect("create tempdir");
959 unsafe {
963 std::env::set_var("HEDDLE_PRINCIPAL_NAME", "Alice Tester");
964 std::env::set_var("HEDDLE_PRINCIPAL_EMAIL", "alice@example.com");
965 }
966 let repo = Repository::init_default(temp.path()).expect("init repo");
967 let dedup = OperationDedupStore::open(repo.heddle_dir()).expect("open dedup");
968 let repo = Arc::new(repo);
969 let svc =
970 LocalStateReviewService::new(GrpcLocalService::new(repo.clone(), Arc::new(dedup)));
971 (svc, repo, temp)
972 }
973
974 fn capture_state(repo: &Repository) -> ChangeId {
975 std::fs::write(repo.root().join("hello.txt"), b"hi").expect("write file");
977 let state = repo
978 .snapshot(Some("seed".to_string()), None)
979 .expect("snapshot");
980 state.change_id
981 }
982
983 fn sign_request(state_id: &ChangeId, op_id: &str) -> SignStateRequest {
984 let signer = crypto::Ed25519Signer::generate().expect("generate ed25519 key");
985 let scope = ReviewScope::WholeChange;
986 let signed_at = chrono::Utc::now().timestamp();
987 let payload = signing_payload(*state_id, ReviewKind::Read, &scope, signed_at, None);
988 let signature = signer.sign(&payload).expect("sign payload");
989 use grpc::heddle::v1::review_scope::{Scope, WholeChange};
990 SignStateRequest {
991 repo_path: String::new(),
992 state_id: state_id.as_bytes().to_vec(),
993 kind: grpc::heddle::v1::ReviewKind::Read as i32,
994 scope: Some(ProtoReviewScope {
995 scope: Some(Scope::WholeChange(WholeChange {})),
996 }),
997 justification: String::new(),
998 algorithm: "ed25519".to_string(),
999 public_key: signer.public_key().to_vec(),
1000 signature: signature.clone(),
1001 signed_at: Some(prost_types::Timestamp {
1002 seconds: signed_at,
1003 nanos: 0,
1004 }),
1005 client_operation_id: op_id.to_string(),
1006 }
1007 }
1008
1009 #[tokio::test]
1010 #[serial_test::serial(principal_env)]
1011 async fn sign_state_persists_to_review_signatures_blob() {
1012 let (svc, repo, _tmp) = fresh_service();
1013 let state_id = capture_state(&repo);
1014
1015 let resp = svc
1016 .sign_state(Request::new(sign_request(&state_id, "")))
1017 .await
1018 .expect("sign_state");
1019 assert!(!resp.get_ref().signature_id.is_empty());
1020 assert_eq!(resp.get_ref().state_id, state_id.as_bytes().to_vec());
1021
1022 let listing = svc
1023 .list_signatures(Request::new(ListSignaturesRequest {
1024 repo_path: String::new(),
1025 state_id: state_id.as_bytes().to_vec(),
1026 }))
1027 .await
1028 .expect("list_signatures");
1029 let sigs = &listing.get_ref().signatures;
1030 assert_eq!(sigs.len(), 1, "expected one signature, got {sigs:?}");
1031 assert_eq!(sigs[0].kind, grpc::heddle::v1::ReviewKind::Read as i32);
1032 assert_eq!(sigs[0].algorithm, "ed25519");
1033 assert_eq!(sigs[0].actor_name, "Alice Tester");
1034 assert_eq!(sigs[0].actor_email, "alice@example.com");
1035 let scope_case = sigs[0].scope.as_ref().and_then(|s| s.scope.as_ref());
1036 assert!(matches!(
1037 scope_case,
1038 Some(grpc::heddle::v1::review_scope::Scope::WholeChange(_))
1039 ));
1040 }
1041
1042 #[tokio::test]
1043 #[serial_test::serial(principal_env)]
1044 async fn sign_state_idempotent() {
1045 let (svc, repo, _tmp) = fresh_service();
1046 let state_id = capture_state(&repo);
1047 let op_id = objects::object::OperationId::new().to_string();
1048 let req = sign_request(&state_id, &op_id);
1051
1052 let first = svc
1053 .sign_state(Request::new(req.clone()))
1054 .await
1055 .expect("first sign_state");
1056 let second = svc
1057 .sign_state(Request::new(req))
1058 .await
1059 .expect("second sign_state");
1060 assert_eq!(
1061 first.get_ref().signature_id,
1062 second.get_ref().signature_id,
1063 "replayed call must return the same signature_id"
1064 );
1065
1066 let listing = svc
1067 .list_signatures(Request::new(ListSignaturesRequest {
1068 repo_path: String::new(),
1069 state_id: state_id.as_bytes().to_vec(),
1070 }))
1071 .await
1072 .expect("list_signatures");
1073 assert_eq!(
1074 listing.get_ref().signatures.len(),
1075 1,
1076 "idempotent replay must not append a duplicate signature"
1077 );
1078 }
1079
1080 #[tokio::test]
1081 #[serial_test::serial(principal_env)]
1082 async fn sign_state_rejects_forged_signature() {
1083 let (svc, repo, _tmp) = fresh_service();
1084 let state_id = capture_state(&repo);
1085 let mut req = sign_request(&state_id, "");
1086 let last = req.signature.len() - 1;
1088 req.signature[last] ^= 0xff;
1089
1090 let err = svc
1091 .sign_state(Request::new(req))
1092 .await
1093 .expect_err("forged signature must be rejected");
1094 assert_eq!(err.code(), tonic::Code::InvalidArgument, "{err:?}");
1095 assert!(
1096 err.message().contains("failed verification"),
1097 "unexpected error message: {}",
1098 err.message()
1099 );
1100 }
1101
1102 #[tokio::test]
1103 #[serial_test::serial(principal_env)]
1104 async fn sign_state_rejects_skewed_timestamp() {
1105 let (svc, repo, _tmp) = fresh_service();
1106 let state_id = capture_state(&repo);
1107 let mut req = sign_request(&state_id, "");
1108 if let Some(ts) = req.signed_at.as_mut() {
1110 ts.seconds += 60 * 60;
1111 }
1112
1113 let err = svc
1114 .sign_state(Request::new(req))
1115 .await
1116 .expect_err("skewed timestamp must be rejected");
1117 assert_eq!(err.code(), tonic::Code::InvalidArgument);
1118 assert!(err.message().contains("too far from server time"));
1119 }
1120
1121 #[tokio::test]
1122 #[serial_test::serial(principal_env)]
1123 async fn sign_state_attributes_to_caller_not_state_author() {
1124 let (svc, repo, _tmp) = fresh_service();
1130 let state_id = capture_state(&repo);
1134
1135 unsafe {
1138 std::env::set_var("HEDDLE_PRINCIPAL_NAME", "Bob Signer");
1139 std::env::set_var("HEDDLE_PRINCIPAL_EMAIL", "bob@example.com");
1140 }
1141
1142 svc.sign_state(Request::new(sign_request(&state_id, "")))
1143 .await
1144 .expect("sign_state");
1145
1146 let listing = svc
1147 .list_signatures(Request::new(ListSignaturesRequest {
1148 repo_path: String::new(),
1149 state_id: state_id.as_bytes().to_vec(),
1150 }))
1151 .await
1152 .expect("list_signatures");
1153 let sigs = &listing.get_ref().signatures;
1154 assert_eq!(sigs.len(), 1);
1155 assert_eq!(
1156 sigs[0].actor_name, "Bob Signer",
1157 "signature must attribute to the caller (Bob), not the state author (Alice)"
1158 );
1159 assert_eq!(sigs[0].actor_email, "bob@example.com");
1160
1161 unsafe {
1164 std::env::set_var("HEDDLE_PRINCIPAL_NAME", "Alice Tester");
1165 std::env::set_var("HEDDLE_PRINCIPAL_EMAIL", "alice@example.com");
1166 }
1167 }
1168
1169 #[tokio::test]
1170 #[serial_test::serial(principal_env)]
1171 async fn sign_state_serializes_concurrent_appends() {
1172 let (svc, repo, _tmp) = fresh_service();
1179 let state_id = capture_state(&repo);
1180
1181 let op_a = objects::object::OperationId::new().to_string();
1183 let op_b = objects::object::OperationId::new().to_string();
1184 let req_a = sign_request(&state_id, &op_a);
1185 let req_b = sign_request(&state_id, &op_b);
1186
1187 let svc_a = svc.clone();
1188 let svc_b = svc.clone();
1189 let (a, b) = tokio::join!(
1190 svc_a.sign_state(Request::new(req_a)),
1191 svc_b.sign_state(Request::new(req_b)),
1192 );
1193 a.expect("first sign_state");
1194 b.expect("second sign_state");
1195
1196 let listing = svc
1197 .list_signatures(Request::new(ListSignaturesRequest {
1198 repo_path: String::new(),
1199 state_id: state_id.as_bytes().to_vec(),
1200 }))
1201 .await
1202 .expect("list_signatures");
1203 assert_eq!(
1204 listing.get_ref().signatures.len(),
1205 2,
1206 "both concurrent signatures must land — neither should be lost \
1207 to a stale-blob clobber"
1208 );
1209 }
1210
1211 #[tokio::test]
1219 #[serial_test::serial(principal_env)]
1220 async fn get_review_payload_populates_diff_summary_and_signals() {
1221 let (svc, repo, _tmp) = fresh_service();
1222
1223 std::fs::write(repo.root().join("hello.txt"), b"first\nsecond\nthird\n")
1226 .expect("write hello.txt");
1227 let first = repo
1228 .snapshot(Some("first capture".to_string()), None)
1229 .expect("first snapshot");
1230
1231 let resp_first = svc
1232 .get_review_payload(Request::new(GetReviewPayloadRequest {
1233 repo_path: String::new(),
1234 state_id: first.change_id.as_bytes().to_vec(),
1235 include_all_signals: false,
1236 }))
1237 .await
1238 .expect("get_review_payload first");
1239 let payload_first = resp_first.into_inner();
1240 let summary_first = payload_first.summary.as_ref().expect("summary present");
1241 assert!(
1242 summary_first.files_changed >= 1,
1243 "first state should report at least one file changed (vs empty parent), got {}",
1244 summary_first.files_changed
1245 );
1246 assert!(
1247 summary_first.added_lines >= 3,
1248 "first state should report 3+ added lines, got {}",
1249 summary_first.added_lines
1250 );
1251 assert!(
1252 !payload_first.in_budget_signals.is_empty(),
1253 "in_budget_signals must include a diff_summary entry"
1254 );
1255 let first_signal = &payload_first.in_budget_signals[0];
1256 assert!(
1257 first_signal.kind.starts_with("diff_summary."),
1258 "expected synthetic diff_summary signal kind, got {}",
1259 first_signal.kind
1260 );
1261 assert_eq!(first_signal.producer_module, "review_show.diff_summary");
1262 assert_eq!(first_signal.visibility, "visible");
1263
1264 std::fs::write(
1268 repo.root().join("hello.txt"),
1269 b"first\nsecond\nthird\nfourth\n",
1270 )
1271 .expect("modify hello.txt");
1272 let second = repo
1273 .snapshot(Some("second capture".to_string()), None)
1274 .expect("second snapshot");
1275
1276 let resp_second = svc
1277 .get_review_payload(Request::new(GetReviewPayloadRequest {
1278 repo_path: String::new(),
1279 state_id: second.change_id.as_bytes().to_vec(),
1280 include_all_signals: false,
1281 }))
1282 .await
1283 .expect("get_review_payload second");
1284 let payload_second = resp_second.into_inner();
1285 let summary_second = payload_second.summary.as_ref().expect("summary present");
1286 assert_eq!(
1287 summary_second.files_changed, 1,
1288 "second state should report exactly one modified file"
1289 );
1290 assert!(
1291 summary_second.added_lines >= 1,
1292 "second state should report at least one added line, got {}",
1293 summary_second.added_lines
1294 );
1295 assert!(
1296 !payload_second.in_budget_signals.is_empty(),
1297 "in_budget_signals must include a diff_summary entry"
1298 );
1299 let signal = &payload_second.in_budget_signals[0];
1300 assert_eq!(
1301 signal
1302 .anchor
1303 .as_ref()
1304 .map(|a| a.file.as_str())
1305 .unwrap_or(""),
1306 "hello.txt",
1307 "diff_summary signal should anchor on the modified file"
1308 );
1309 assert!(
1310 signal.reason.contains("files changed"),
1311 "first signal reason should carry the aggregate summary, got {}",
1312 signal.reason
1313 );
1314 assert_eq!(
1318 summary_second.in_budget_signal_count,
1319 payload_second.in_budget_signals.len() as u32,
1320 "in_budget_signal_count must match the array length"
1321 );
1322 }
1323
1324 #[tokio::test]
1336 #[serial_test::serial(principal_env)]
1337 async fn get_review_payload_tolerates_missing_tree() {
1338 let (svc, repo, _tmp) = fresh_service();
1339 let state_id = capture_state(&repo);
1340
1341 let state = repo
1346 .store()
1347 .get_state(&state_id)
1348 .expect("get state")
1349 .expect("state present");
1350 let bogus_tree = objects::object::ContentHash::compute(b"definitely-not-in-store-bytes");
1351 let mut mutated = state.clone();
1352 mutated.tree = bogus_tree;
1353 repo.store().put_state(&mutated).expect("put mutated state");
1354
1355 let resp = svc
1358 .get_review_payload(Request::new(GetReviewPayloadRequest {
1359 repo_path: String::new(),
1360 state_id: state_id.as_bytes().to_vec(),
1361 include_all_signals: false,
1362 }))
1363 .await
1364 .expect("missing tree must not block review payload");
1365 let payload = resp.into_inner();
1366 let summary = payload.summary.as_ref().expect("summary present");
1367 assert_eq!(
1368 summary.files_changed, 0,
1369 "missing tree must produce a zero-change summary, got {} files",
1370 summary.files_changed
1371 );
1372 assert!(
1376 !payload.in_budget_signals.is_empty(),
1377 "in_budget_signals should always contain at least the synthetic diff_summary entry"
1378 );
1379 let kind = &payload.in_budget_signals[0].kind;
1380 assert!(
1381 kind.starts_with("diff_summary."),
1382 "expected synthetic diff_summary signal, got {kind}"
1383 );
1384 }
1385
1386 #[test]
1390 fn line_count_matches_git_semantics() {
1391 assert_eq!(line_count(b""), 0);
1392 assert_eq!(line_count(b"\n"), 1);
1393 assert_eq!(line_count(b"hello"), 1);
1394 assert_eq!(line_count(b"hello\n"), 1);
1395 assert_eq!(line_count(b"hello\nworld"), 2);
1396 assert_eq!(line_count(b"hello\nworld\n"), 2);
1397 assert_eq!(line_count(b"a\nb\nc\n"), 3);
1398 assert_eq!(line_count(&[0xff, 0xfe, 0xfd]), 0);
1400 }
1401}