1use std::collections::HashSet;
16
17use grpc::heddle::v1::{
18 AppendTurnRequest, ContextAnnotationKind, Discussion as ProtoDiscussion,
19 DiscussionResolution as ProtoDiscussionResolution, DiscussionTurn as ProtoDiscussionTurn,
20 GetDiscussionRequest, ListDiscussionsByStateRequest, ListDiscussionsBySymbolRequest,
21 ListDiscussionsResponse, OpenDiscussionRequest, PathSymbolRef, ResolveDiscussionRequest,
22 discussion_service_server::DiscussionService,
23 resolve_discussion_request::ResolveIntoAnnotation,
24};
25use objects::{
26 lock::RepositoryLockExt,
27 object::{
28 Annotation, AnnotationKind, AnnotationScope, Blob, ChangeId, ContentHash, ContextBlob,
29 ContextTarget, Discussion, DiscussionResolution, DiscussionTurn, DiscussionsBlob,
30 Principal, State, SymbolAnchor, VisibilityTier,
31 },
32 store::ObjectStore,
33};
34use prost::Message;
35use refs::Head;
36use repo::Repository;
37use tonic::{Request, Response, Status};
38
39use super::{GrpcLocalService, to_status, with_idempotency};
40
41#[derive(Clone)]
42pub struct LocalDiscussionService {
43 inner: GrpcLocalService,
44}
45
46impl LocalDiscussionService {
47 pub fn new(inner: GrpcLocalService) -> Self {
48 Self { inner }
49 }
50}
51
52fn now_secs() -> i64 {
53 std::time::SystemTime::now()
54 .duration_since(std::time::UNIX_EPOCH)
55 .map(|d| d.as_secs() as i64)
56 .unwrap_or(0)
57}
58
59fn parse_visibility(s: &str) -> Result<VisibilityTier, Status> {
63 let trimmed = s.trim();
64 match trimmed {
65 "" | "public" => return Ok(VisibilityTier::Public),
66 "internal" => return Ok(VisibilityTier::Internal),
67 _ => {}
68 }
69
70 if let Some(team_id) = trimmed.strip_prefix("team:") {
71 let team_id = team_id.trim();
72 if team_id.is_empty() {
73 return Err(Status::invalid_argument(
74 "discussion visibility team:<id> requires a non-empty id",
75 ));
76 }
77 return Ok(VisibilityTier::TeamScoped {
78 team_id: team_id.to_string(),
79 });
80 }
81 if let Some(scope_label) = trimmed.strip_prefix("restricted:") {
82 let scope_label = scope_label.trim();
83 if scope_label.is_empty() {
84 return Err(Status::invalid_argument(
85 "discussion visibility restricted:<label> requires a non-empty label",
86 ));
87 }
88 return Ok(VisibilityTier::Restricted {
89 scope_label: scope_label.to_string(),
90 });
91 }
92 if let Some(scope_label) = trimmed.strip_prefix("private:") {
93 let scope_label = scope_label.trim();
94 if scope_label.is_empty() {
95 return Err(Status::invalid_argument(
96 "discussion visibility private:<label> requires a non-empty label",
97 ));
98 }
99 return Ok(VisibilityTier::Private {
100 scope_label: scope_label.to_string(),
101 });
102 }
103 Err(Status::invalid_argument(format!(
104 "unsupported discussion visibility {trimmed:?}; expected public, internal, team:<id>, restricted:<label>, or private:<label>"
105 )))
106}
107
108fn visibility_to_wire(visibility: &VisibilityTier) -> String {
109 match visibility {
110 VisibilityTier::Public | VisibilityTier::Internal => visibility.as_str().to_string(),
111 VisibilityTier::TeamScoped { team_id } => format!("team:{team_id}"),
112 VisibilityTier::Restricted { scope_label } => format!("restricted:{scope_label}"),
113 VisibilityTier::Private { scope_label } => format!("private:{scope_label}"),
114 }
115}
116
117fn turn_to_proto(turn: &DiscussionTurn) -> ProtoDiscussionTurn {
118 ProtoDiscussionTurn {
119 author_name: turn.author.name.clone(),
120 author_email: turn.author.email.clone(),
121 body: turn.body.clone(),
122 posted_at: Some(prost_types::Timestamp {
123 seconds: turn.posted_at,
124 nanos: 0,
125 }),
126 }
127}
128
129fn resolution_to_proto(resolution: &DiscussionResolution) -> ProtoDiscussionResolution {
130 use grpc::heddle::v1::discussion_resolution::{
131 Dismissed, Open, ResolvedByEdit, ResolvedIntoAnnotation, State,
132 };
133 let state = match resolution {
134 DiscussionResolution::Open => State::Open(Open {}),
135 DiscussionResolution::ResolvedIntoAnnotation { annotation_id } => {
136 State::IntoAnnotation(ResolvedIntoAnnotation {
137 annotation_id: annotation_id.clone(),
138 })
139 }
140 DiscussionResolution::ResolvedByEdit { state_id } => State::ByEdit(ResolvedByEdit {
141 state_id: state_id.as_bytes().to_vec(),
142 }),
143 DiscussionResolution::Dismissed { reason } => State::Dismissed(Dismissed {
144 reason: reason.clone(),
145 }),
146 };
147 ProtoDiscussionResolution { state: Some(state) }
148}
149
150fn discussion_to_proto(d: &Discussion) -> ProtoDiscussion {
151 ProtoDiscussion {
152 id: d.id.clone(),
153 anchor: Some(PathSymbolRef {
154 file: d.anchor.file.clone(),
155 symbol: d.anchor.symbol.clone(),
156 }),
157 opened_against_state: d.opened_against_state.as_bytes().to_vec(),
158 opened_at: Some(prost_types::Timestamp {
159 seconds: d.opened_at,
160 nanos: 0,
161 }),
162 thread_ref: d.thread_ref.clone().unwrap_or_default(),
163 turns: d.turns.iter().map(turn_to_proto).collect(),
164 resolution: Some(resolution_to_proto(&d.resolution)),
165 body_changed_since_open: d.body_changed_since_open,
166 orphaned: d.orphaned,
167 visibility: visibility_to_wire(&d.visibility),
168 resolved_annotation_id: d.resolved_annotation_id.clone().unwrap_or_default(),
169 }
170}
171
172fn load_state(repo: &Repository, state_id: &[u8]) -> Result<(ChangeId, State), Status> {
175 let id = ChangeId::try_from_slice(state_id)
176 .map_err(|err| Status::invalid_argument(format!("invalid state_id: {err}")))?;
177 let state = repo
178 .store()
179 .get_state(&id)
180 .map_err(to_status)?
181 .ok_or_else(|| Status::not_found(format!("state {} not found", id.to_string_full())))?;
182 Ok((id, state))
183}
184
185fn decode_blob_for_state(repo: &Repository, state: &State) -> Result<DiscussionsBlob, Status> {
188 let Some(hash) = state.discussions else {
189 return Ok(DiscussionsBlob::new(Vec::new()));
190 };
191 let blob = repo
192 .store()
193 .get_blob(&hash)
194 .map_err(to_status)?
195 .ok_or_else(|| {
196 Status::not_found(format!(
197 "discussions blob {} referenced by state {} is missing",
198 hash,
199 state.change_id.to_string_full()
200 ))
201 })?;
202 DiscussionsBlob::decode(blob.content())
203 .map_err(|err| Status::internal(format!("failed to decode discussions blob: {err}")))
204}
205
206fn load_discussions_blob(
208 repo: &Repository,
209 state_id: &ChangeId,
210) -> Result<(State, DiscussionsBlob), Status> {
211 let state = repo
212 .store()
213 .get_state(state_id)
214 .map_err(to_status)?
215 .ok_or_else(|| {
216 Status::not_found(format!("state {} not found", state_id.to_string_full()))
217 })?;
218 let blob = decode_blob_for_state(repo, &state)?;
219 Ok((state, blob))
220}
221
222fn save_discussions_blob(
225 repo: &Repository,
226 state: &State,
227 blob: &DiscussionsBlob,
228) -> Result<State, Status> {
229 let hash = put_discussions_blob(repo, blob)?;
230 let new_state = state.clone().with_discussions(hash);
231 repo.store().put_state(&new_state).map_err(to_status)?;
232 Ok(new_state)
233}
234
235fn principal_for(repo: &Repository) -> Principal {
240 repo.get_principal()
241 .unwrap_or_else(|_| Principal::new("<unknown>", ""))
242}
243
244fn head_state(repo: &Repository) -> Result<State, Status> {
247 let head_id = repo
248 .head()
249 .map_err(to_status)?
250 .ok_or_else(|| Status::failed_precondition("repository has no HEAD"))?;
251 repo.store()
252 .get_state(&head_id)
253 .map_err(to_status)?
254 .ok_or_else(|| {
255 Status::not_found(format!("HEAD state {} not found", head_id.to_string_full()))
256 })
257}
258
259fn status_matches(d: &Discussion, status: &str) -> bool {
262 match status {
263 "open" => d.is_open(),
264 "resolved" => !d.is_open(),
265 "orphaned" => d.orphaned,
266 _ => true,
268 }
269}
270
271fn put_discussions_blob(repo: &Repository, blob: &DiscussionsBlob) -> Result<ContentHash, Status> {
272 let bytes = blob
273 .encode()
274 .map_err(|err| Status::internal(format!("failed to encode discussions blob: {err}")))?;
275 repo.store().put_blob(&Blob::new(bytes)).map_err(to_status)
276}
277
278fn reachable_discussions(repo: &Repository) -> Result<Vec<(ChangeId, Discussion)>, Status> {
279 let mut out = Vec::new();
280 let mut seen = HashSet::new();
281
282 if let Some(head_id) = repo.head().map_err(to_status)?
283 && let Some(head) = repo.store().get_state(&head_id).map_err(to_status)?
284 {
285 push_discussions_from_state(repo, head_id, &head, &mut seen, &mut out)?;
286 }
287
288 for state_id in repo
289 .reachable_states()
290 .map_err(|err| Status::internal(format!("walk reachable states: {err}")))?
291 {
292 let Some(state) = repo.store().get_state(&state_id).map_err(to_status)? else {
293 continue;
294 };
295 push_discussions_from_state(repo, state_id, &state, &mut seen, &mut out)?;
296 }
297
298 Ok(out)
299}
300
301fn push_discussions_from_state(
302 repo: &Repository,
303 state_id: ChangeId,
304 state: &State,
305 seen: &mut HashSet<String>,
306 out: &mut Vec<(ChangeId, Discussion)>,
307) -> Result<(), Status> {
308 let blob = decode_blob_for_state(repo, state)?;
309 for discussion in blob.discussions {
310 if seen.insert(discussion.id.clone()) {
311 out.push((state_id, discussion));
312 }
313 }
314 Ok(())
315}
316
317fn annotation_kind_from_proto(kind: i32) -> Result<AnnotationKind, Status> {
318 match ContextAnnotationKind::try_from(kind)
319 .map_err(|_| Status::invalid_argument(format!("unknown annotation kind tag {kind}")))?
320 {
321 ContextAnnotationKind::Unspecified | ContextAnnotationKind::Rationale => {
322 Ok(AnnotationKind::Rationale)
323 }
324 ContextAnnotationKind::Constraint => Ok(AnnotationKind::Constraint),
325 ContextAnnotationKind::Invariant => Ok(AnnotationKind::Invariant),
326 }
327}
328
329fn resolve_discussion_into_annotation(
330 repo: &Repository,
331 head: &State,
332 discussions: &mut DiscussionsBlob,
333 discussion_index: usize,
334 payload: ResolveIntoAnnotation,
335) -> Result<Discussion, Status> {
336 if payload.content.trim().is_empty() {
337 return Err(Status::invalid_argument(
338 "into-annotation resolution requires non-empty content",
339 ));
340 }
341 let kind = annotation_kind_from_proto(payload.kind)?;
342 let attribution = repo
343 .get_attribution()
344 .map_err(|err| Status::internal(format!("resolve attribution: {err}")))?;
345
346 let discussion = discussions
347 .discussions
348 .get(discussion_index)
349 .ok_or_else(|| Status::internal("discussion index out of range"))?
350 .clone();
351 let target = ContextTarget::file(discussion.anchor.file.clone())
352 .map_err(|err| Status::invalid_argument(err.to_string()))?;
353 let mut scope = AnnotationScope::Symbol {
354 name: discussion.anchor.symbol.clone(),
355 resolved_lines: None,
356 };
357 target
358 .validate_scope(&scope)
359 .map_err(|err| Status::invalid_argument(err.to_string()))?;
360 let source = target.path().and_then(|path| {
361 std::fs::read(repo.root().join(path))
362 .ok()
363 .map(|bytes| (path.to_string(), bytes))
364 });
365 scope = resolve_annotation_scope(
366 source
367 .as_ref()
368 .map(|(path, bytes)| (path.as_str(), bytes.as_slice())),
369 scope,
370 );
371 let source_hash =
372 compute_annotation_source_hash(source.as_ref().map(|(_, bytes)| bytes.as_slice()), &scope);
373
374 let mut annotation = Annotation::new(
375 scope,
376 kind,
377 payload.content,
378 payload.tags,
379 attribution.to_string(),
380 now_secs(),
381 source_hash,
382 Some(head.change_id),
383 );
384 annotation.resolved_from_discussion = Some(discussion.id.clone());
385 annotation.visibility = discussion.visibility.clone();
386 let annotation_id = annotation.annotation_id.clone();
387
388 let mut context = match head.context {
389 Some(root) => repo
390 .get_context_blob(&root, &target)
391 .map_err(to_status)?
392 .unwrap_or_else(|| ContextBlob::new(Vec::new())),
393 None => ContextBlob::new(Vec::new()),
394 };
395 context.annotations.push(annotation);
396 let context_root = repo
397 .set_context_blob(head.context.as_ref(), &target, &context)
398 .map_err(to_status)?;
399
400 let updated = discussions
401 .discussions
402 .get_mut(discussion_index)
403 .ok_or_else(|| Status::internal("discussion index out of range"))?;
404 updated.resolution = DiscussionResolution::ResolvedIntoAnnotation {
405 annotation_id: annotation_id.clone(),
406 };
407 updated.resolved_annotation_id = Some(annotation_id);
408 updated
409 .validate()
410 .map_err(|err| Status::invalid_argument(err.to_string()))?;
411 let updated = updated.clone();
412
413 let discussions_hash = put_discussions_blob(repo, discussions)?;
414 let mut new_state = State::new(head.tree, vec![head.change_id], attribution)
415 .with_intent(format!(
416 "discussion: resolve {} into annotation",
417 updated.id
418 ))
419 .with_context(context_root)
420 .with_discussions(discussions_hash);
421 if let Some(provenance) = head.provenance {
422 new_state = new_state.with_provenance(provenance);
423 }
424 if let Some(risk_signals) = head.risk_signals {
425 new_state = new_state.with_risk_signals(risk_signals);
426 }
427 if let Some(review_signatures) = head.review_signatures {
428 new_state = new_state.with_review_signatures(review_signatures);
429 }
430 if let Some(structured_conflicts) = head.structured_conflicts {
431 new_state = new_state.with_structured_conflicts(structured_conflicts);
432 }
433 repo.put_authored_state(&mut new_state).map_err(to_status)?;
434 advance_head(repo, &new_state).map_err(to_status)?;
435
436 Ok(updated)
437}
438
439fn resolve_annotation_scope(
440 source: Option<(&str, &[u8])>,
441 scope: AnnotationScope,
442) -> AnnotationScope {
443 let AnnotationScope::Symbol {
444 name,
445 resolved_lines: None,
446 } = scope
447 else {
448 return scope;
449 };
450 let Some((path, source)) = source else {
451 return AnnotationScope::Symbol {
452 name,
453 resolved_lines: None,
454 };
455 };
456 #[cfg(feature = "semantic")]
457 {
458 match repo::symbol_resolver::resolve_symbol_lines(source, std::path::Path::new(path), &name)
459 {
460 Ok((start, end)) => AnnotationScope::Symbol {
461 name,
462 resolved_lines: Some((start, end)),
463 },
464 Err(_) => AnnotationScope::Symbol {
465 name,
466 resolved_lines: None,
467 },
468 }
469 }
470 #[cfg(not(feature = "semantic"))]
471 {
472 let _ = path;
473 let _ = source;
474 AnnotationScope::Symbol {
475 name,
476 resolved_lines: None,
477 }
478 }
479}
480
481fn compute_annotation_source_hash(
482 source: Option<&[u8]>,
483 scope: &AnnotationScope,
484) -> Option<ContentHash> {
485 let source = source?;
486 let scoped = match scope {
487 AnnotationScope::Lines(start, end) => extract_line_range(source, *start, *end),
488 AnnotationScope::Symbol {
489 resolved_lines: Some((start, end)),
490 ..
491 } => extract_line_range(source, *start, *end),
492 _ => source.to_vec(),
493 };
494 Some(ContentHash::compute(&scoped))
495}
496
497fn extract_line_range(source: &[u8], start: u32, end: u32) -> Vec<u8> {
498 let start_line = start.max(1);
499 let end_line = end.max(start_line);
500 let mut current_line = 1;
501 let mut start_byte = (start_line == 1).then_some(0);
502 let mut end_byte = None;
503
504 for (idx, byte) in source.iter().enumerate() {
505 if *byte != b'\n' {
506 continue;
507 }
508 if current_line == end_line {
509 end_byte = Some(idx + 1);
510 break;
511 }
512 current_line += 1;
513 if current_line == start_line {
514 start_byte = Some(idx + 1);
515 }
516 }
517
518 let Some(start_byte) = start_byte else {
519 return Vec::new();
520 };
521 let end_byte = end_byte.unwrap_or(source.len());
522 if start_byte > end_byte || start_byte > source.len() {
523 return Vec::new();
524 }
525 source[start_byte..end_byte].to_vec()
526}
527
528fn advance_head(repo: &Repository, state: &State) -> repo::Result<()> {
529 match repo.refs().read_head()? {
530 Head::Attached { thread } => repo.refs().set_thread(&thread, &state.change_id),
531 Head::Detached { .. } => repo.refs().write_head(&Head::Detached {
532 state: state.change_id,
533 }),
534 }
535}
536
537#[tonic::async_trait]
538impl DiscussionService for LocalDiscussionService {
539 async fn open_discussion(
540 &self,
541 request: Request<OpenDiscussionRequest>,
542 ) -> Result<Response<ProtoDiscussion>, Status> {
543 let req = request.into_inner();
544 let req_bytes = req.encode_to_vec();
545 let client_op_id = req.client_operation_id.clone();
546 let inner = self.inner.clone();
547
548 let result = with_idempotency(
549 &self.inner,
550 &client_op_id,
551 "discussion.open",
552 &req_bytes,
553 move || {
554 let req = req.clone();
555 let inner = inner.clone();
556 async move {
557 let repo = inner.repo();
558 let anchor_proto = req
559 .anchor
560 .clone()
561 .ok_or_else(|| Status::invalid_argument("anchor is required"))?;
562 if anchor_proto.file.is_empty() {
563 return Err(Status::invalid_argument("anchor.file is required"));
564 }
565 if anchor_proto.symbol.is_empty() {
566 return Err(Status::invalid_argument("anchor.symbol is required"));
567 }
568 if req.body.trim().is_empty() {
569 return Err(Status::invalid_argument("body must be non-empty"));
570 }
571 let opened_against =
572 ChangeId::try_from_slice(&req.state_id).map_err(|err| {
573 Status::invalid_argument(format!("invalid state_id: {err}"))
574 })?;
575 let now = now_secs();
576 let principal = principal_for(repo);
577 let visibility = if req.visibility.trim().is_empty() {
578 repo.resolve_capture_default_visibility()
579 } else {
580 parse_visibility(&req.visibility)?
581 };
582 let discussion = Discussion {
583 id: ChangeId::generate().to_string_full(),
584 anchor: SymbolAnchor::new(anchor_proto.file, anchor_proto.symbol),
585 opened_against_state: opened_against,
586 opened_at: now,
587 thread_ref: (!req.thread_ref.is_empty()).then(|| req.thread_ref.clone()),
588 turns: vec![DiscussionTurn {
589 author: principal,
590 body: req.body.clone(),
591 posted_at: now,
592 }],
593 resolution: DiscussionResolution::Open,
594 body_changed_since_open: false,
595 orphaned: false,
596 visibility,
597 resolved_annotation_id: None,
598 };
599 discussion
600 .validate()
601 .map_err(|err| Status::invalid_argument(err.to_string()))?;
602 let _lock = repo
603 .locker()
604 .write()
605 .map_err(|err| Status::internal(err.to_string()))?;
606 let (state, mut blob) = load_discussions_blob(repo, &opened_against)?;
607 blob.discussions.push(discussion.clone());
608 save_discussions_blob(repo, &state, &blob)?;
609 Ok(discussion_to_proto(&discussion))
610 }
611 },
612 )
613 .await?;
614
615 Ok(Response::new(result))
616 }
617
618 async fn append_turn(
619 &self,
620 request: Request<AppendTurnRequest>,
621 ) -> Result<Response<ProtoDiscussion>, Status> {
622 let req = request.into_inner();
623 let req_bytes = req.encode_to_vec();
624 let client_op_id = req.client_operation_id.clone();
625 let inner = self.inner.clone();
626
627 let result = with_idempotency(
628 &self.inner,
629 &client_op_id,
630 "discussion.append_turn",
631 &req_bytes,
632 move || {
633 let req = req.clone();
634 let inner = inner.clone();
635 async move {
636 let repo = inner.repo();
637 if req.discussion_id.is_empty() {
638 return Err(Status::invalid_argument("discussion_id is required"));
639 }
640 if req.body.trim().is_empty() {
641 return Err(Status::invalid_argument("body must be non-empty"));
642 }
643 let principal = principal_for(repo);
647 let _lock = repo
648 .locker()
649 .write()
650 .map_err(|err| Status::internal(err.to_string()))?;
651 let head = head_state(repo)?;
652 let mut blob = decode_blob_for_state(repo, &head)?;
653 let idx = blob
654 .discussions
655 .iter()
656 .position(|d| d.id == req.discussion_id)
657 .ok_or_else(|| {
658 Status::not_found(format!("discussion {} not found", req.discussion_id))
659 })?;
660 blob.discussions[idx].turns.push(DiscussionTurn {
661 author: principal,
662 body: req.body.clone(),
663 posted_at: now_secs(),
664 });
665 blob.discussions[idx]
666 .validate()
667 .map_err(|err| Status::invalid_argument(err.to_string()))?;
668 let updated = blob.discussions[idx].clone();
669 save_discussions_blob(repo, &head, &blob)?;
670 Ok(discussion_to_proto(&updated))
671 }
672 },
673 )
674 .await?;
675
676 Ok(Response::new(result))
677 }
678
679 async fn resolve_discussion(
680 &self,
681 request: Request<ResolveDiscussionRequest>,
682 ) -> Result<Response<ProtoDiscussion>, Status> {
683 let req = request.into_inner();
684 let req_bytes = req.encode_to_vec();
685 let client_op_id = req.client_operation_id.clone();
686 let inner = self.inner.clone();
687
688 let result = with_idempotency(
689 &self.inner,
690 &client_op_id,
691 "discussion.resolve",
692 &req_bytes,
693 move || {
694 let req = req.clone();
695 let inner = inner.clone();
696 async move {
697 let repo = inner.repo();
698 if req.discussion_id.is_empty() {
699 return Err(Status::invalid_argument("discussion_id is required"));
700 }
701 use grpc::heddle::v1::resolve_discussion_request::Resolution;
705 let resolution = req
706 .resolution
707 .clone()
708 .ok_or_else(|| Status::invalid_argument("resolution mode is required"))?;
709 if let Resolution::Dismissed(ref payload) = resolution
710 && payload.reason.trim().is_empty()
711 {
712 return Err(Status::invalid_argument(
713 "dismissal requires a non-empty reason",
714 ));
715 }
716
717 let _lock = repo
718 .locker()
719 .write()
720 .map_err(|err| Status::internal(err.to_string()))?;
721 let head = head_state(repo)?;
722 let mut blob = decode_blob_for_state(repo, &head)?;
723 let idx = blob
724 .discussions
725 .iter()
726 .position(|d| d.id == req.discussion_id)
727 .ok_or_else(|| {
728 Status::not_found(format!("discussion {} not found", req.discussion_id))
729 })?;
730
731 match resolution {
732 Resolution::IntoAnnotation(payload) => {
733 let updated = resolve_discussion_into_annotation(
734 repo, &head, &mut blob, idx, payload,
735 )?;
736 return Ok(discussion_to_proto(&updated));
737 }
738 Resolution::ByEdit(payload) => {
739 let state_id =
740 ChangeId::try_from_slice(&payload.state_id).map_err(|err| {
741 Status::invalid_argument(format!("invalid state_id: {err}"))
742 })?;
743 blob.discussions[idx].resolution =
744 DiscussionResolution::ResolvedByEdit { state_id };
745 }
746 Resolution::Dismissed(payload) => {
747 blob.discussions[idx].resolution = DiscussionResolution::Dismissed {
748 reason: payload.reason,
749 };
750 }
751 }
752
753 blob.discussions[idx]
754 .validate()
755 .map_err(|err| Status::invalid_argument(err.to_string()))?;
756 let updated = blob.discussions[idx].clone();
757 save_discussions_blob(repo, &head, &blob)?;
758 Ok(discussion_to_proto(&updated))
759 }
760 },
761 )
762 .await?;
763
764 Ok(Response::new(result))
765 }
766
767 async fn list_by_state(
768 &self,
769 request: Request<ListDiscussionsByStateRequest>,
770 ) -> Result<Response<ListDiscussionsResponse>, Status> {
771 let req = request.into_inner();
772 let repo = self.inner.repo();
773 let (_, state) = load_state(repo, &req.state_id)?;
774 let blob = decode_blob_for_state(repo, &state)?;
775 let discussions = blob
776 .discussions
777 .iter()
778 .filter(|d| status_matches(d, &req.status))
779 .map(discussion_to_proto)
780 .collect();
781 Ok(Response::new(ListDiscussionsResponse { discussions }))
782 }
783
784 async fn list_by_symbol(
785 &self,
786 request: Request<ListDiscussionsBySymbolRequest>,
787 ) -> Result<Response<ListDiscussionsResponse>, Status> {
788 let req = request.into_inner();
789 let anchor = req
790 .anchor
791 .ok_or_else(|| Status::invalid_argument("anchor is required"))?;
792 if anchor.file.is_empty() || anchor.symbol.is_empty() {
793 return Err(Status::invalid_argument(
794 "anchor.file and anchor.symbol are required",
795 ));
796 }
797 let repo = self.inner.repo();
798 let discussions = reachable_discussions(repo)?
799 .into_iter()
800 .map(|(_, discussion)| discussion)
801 .filter(|discussion| {
802 discussion.anchor.file == anchor.file
803 && discussion.anchor.symbol == anchor.symbol
804 && status_matches(discussion, &req.status)
805 })
806 .map(|discussion| discussion_to_proto(&discussion))
807 .collect();
808 Ok(Response::new(ListDiscussionsResponse { discussions }))
809 }
810
811 async fn get_discussion(
812 &self,
813 request: Request<GetDiscussionRequest>,
814 ) -> Result<Response<ProtoDiscussion>, Status> {
815 let req = request.into_inner();
816 if req.discussion_id.is_empty() {
817 return Err(Status::invalid_argument("discussion_id is required"));
818 }
819 let repo = self.inner.repo();
822 if req.state_id.is_empty() {
823 let discussion = reachable_discussions(repo)?
824 .into_iter()
825 .map(|(_, discussion)| discussion)
826 .find(|discussion| discussion.id == req.discussion_id)
827 .ok_or_else(|| {
828 Status::not_found(format!("discussion {} not found", req.discussion_id))
829 })?;
830 return Ok(Response::new(discussion_to_proto(&discussion)));
831 }
832
833 let state = load_state(repo, &req.state_id)?.1;
834 let blob = decode_blob_for_state(repo, &state)?;
835 let discussion = blob
836 .discussions
837 .iter()
838 .find(|d| d.id == req.discussion_id)
839 .ok_or_else(|| {
840 Status::not_found(format!("discussion {} not found", req.discussion_id))
841 })?;
842 Ok(Response::new(discussion_to_proto(discussion)))
843 }
844}
845
846#[cfg(test)]
847mod tests {
848 use std::sync::Arc;
849
850 use objects::object::{Attribution, Principal};
851 use repo::{Repository, operation_dedup::OperationDedupStore};
852 use tempfile::TempDir;
853
854 use super::*;
855
856 fn fresh_service() -> (TempDir, ChangeId, LocalDiscussionService) {
857 let temp = TempDir::new().unwrap();
858 let repo = Repository::init_default(temp.path()).unwrap();
859 let attribution = Attribution::human(Principal::new("Tester", "tester@example.com"));
861 let state = repo
862 .snapshot_with_attribution(Some("seed".into()), None, attribution)
863 .unwrap();
864 let dedup = OperationDedupStore::open(repo.heddle_dir()).unwrap();
865 let inner = GrpcLocalService::new(Arc::new(repo), Arc::new(dedup));
866 let svc = LocalDiscussionService::new(inner);
867 (temp, state.change_id, svc)
868 }
869
870 fn open_request(state_id: &ChangeId, body: &str, op_id: &str) -> OpenDiscussionRequest {
871 OpenDiscussionRequest {
872 repo_path: String::new(),
873 state_id: state_id.as_bytes().to_vec(),
874 anchor: Some(PathSymbolRef {
875 file: "src/lib.rs".into(),
876 symbol: "foo".into(),
877 }),
878 body: body.into(),
879 visibility: String::new(),
880 thread_ref: String::new(),
881 client_operation_id: op_id.into(),
882 }
883 }
884
885 #[tokio::test]
886 #[serial_test::serial(process_global)]
887 async fn open_then_append_turn_persists_both_turns() {
888 let (_t, state_id, svc) = fresh_service();
889 let opened = svc
890 .open_discussion(Request::new(open_request(&state_id, "first", "")))
891 .await
892 .unwrap()
893 .into_inner();
894 assert_eq!(opened.turns.len(), 1);
895 assert_eq!(opened.turns[0].body, "first");
896
897 let appended = svc
898 .append_turn(Request::new(AppendTurnRequest {
899 repo_path: String::new(),
900 discussion_id: opened.id.clone(),
901 body: "second".into(),
902 client_operation_id: String::new(),
903 }))
904 .await
905 .unwrap()
906 .into_inner();
907 assert_eq!(appended.turns.len(), 2);
908 assert_eq!(appended.turns[0].body, "first");
909 assert_eq!(appended.turns[1].body, "second");
910
911 let listed = svc
913 .list_by_state(Request::new(ListDiscussionsByStateRequest {
914 repo_path: String::new(),
915 state_id: state_id.as_bytes().to_vec(),
916 status: "all".into(),
917 }))
918 .await
919 .unwrap()
920 .into_inner();
921 assert_eq!(listed.discussions.len(), 1);
924 assert_eq!(listed.discussions[0].turns.len(), 2);
925 }
926
927 #[tokio::test]
928 #[serial_test::serial(process_global)]
929 async fn open_idempotent_returns_same_discussion() {
930 let (_t, state_id, svc) = fresh_service();
931 let op_id = "11111111-2222-3333-4444-555555555555";
932 let first = svc
933 .open_discussion(Request::new(open_request(&state_id, "hello", op_id)))
934 .await
935 .unwrap()
936 .into_inner();
937 let second = svc
938 .open_discussion(Request::new(open_request(&state_id, "hello", op_id)))
939 .await
940 .unwrap()
941 .into_inner();
942 assert_eq!(first.id, second.id);
943 assert_eq!(first.turns.len(), 1);
944 assert_eq!(second.turns.len(), 1);
945 }
946
947 #[tokio::test]
948 #[serial_test::serial(process_global)]
949 async fn open_discussion_serializes_concurrent_appends() {
950 let (_t, state_id, svc) = fresh_service();
956 let op_a = objects::object::OperationId::new().to_string();
957 let op_b = objects::object::OperationId::new().to_string();
958 let mut req_a = open_request(&state_id, "body a", &op_a);
959 req_a.anchor.as_mut().unwrap().symbol = "sym_a".into();
960 let mut req_b = open_request(&state_id, "body b", &op_b);
961 req_b.anchor.as_mut().unwrap().symbol = "sym_b".into();
962
963 let svc_a = svc.clone();
964 let svc_b = svc.clone();
965 let (a, b) = tokio::join!(
966 svc_a.open_discussion(Request::new(req_a)),
967 svc_b.open_discussion(Request::new(req_b)),
968 );
969 a.expect("first open_discussion");
970 b.expect("second open_discussion");
971
972 let listed = svc
973 .list_by_state(Request::new(ListDiscussionsByStateRequest {
974 repo_path: String::new(),
975 state_id: state_id.as_bytes().to_vec(),
976 status: "all".into(),
977 }))
978 .await
979 .expect("list_by_state");
980 assert_eq!(
981 listed.get_ref().discussions.len(),
982 2,
983 "both concurrent discussions must land — neither should be lost \
984 to a stale-blob clobber"
985 );
986 }
987
988 #[tokio::test]
989 #[serial_test::serial(process_global)]
990 async fn open_rejects_labelled_visibility_tiers_without_labels() {
991 let (_t, state_id, svc) = fresh_service();
992
993 for visibility in [
994 "team",
995 "team:",
996 "team_scoped",
997 "restricted",
998 "restricted:",
999 "private",
1000 "private:",
1001 "unknown",
1002 ] {
1003 let mut req = open_request(&state_id, "hello", "");
1004 req.visibility = visibility.into();
1005 let err = svc.open_discussion(Request::new(req)).await.unwrap_err();
1006 assert_eq!(err.code(), tonic::Code::InvalidArgument);
1007 }
1008 }
1009
1010 #[tokio::test]
1011 #[serial_test::serial(process_global)]
1012 async fn open_round_trips_supported_visibility_tiers() {
1013 let (_t, state_id, svc) = fresh_service();
1014
1015 for (visibility, expected) in [
1016 ("", "public"),
1017 ("public", "public"),
1018 ("internal", "internal"),
1019 ("team:platform", "team:platform"),
1020 ("restricted:legal", "restricted:legal"),
1021 ("private:embargo-x", "private:embargo-x"),
1022 ] {
1023 let mut req = open_request(&state_id, "hello", "");
1024 req.visibility = visibility.into();
1025 let opened = svc
1026 .open_discussion(Request::new(req))
1027 .await
1028 .unwrap()
1029 .into_inner();
1030 assert_eq!(opened.visibility, expected);
1031 }
1032 }
1033
1034 #[tokio::test]
1035 #[serial_test::serial(process_global)]
1036 async fn open_empty_visibility_uses_repo_discussion_default() {
1037 let temp = TempDir::new().unwrap();
1038 Repository::init_default(temp.path()).unwrap();
1039 std::fs::write(
1040 temp.path().join(".heddle/config.toml"),
1041 "[repository]\nversion = 1\n\n[review.discussion]\ndefault_visibility = \"Internal\"\n",
1042 )
1043 .unwrap();
1044 let repo = Repository::open(temp.path()).unwrap();
1045 let attribution = Attribution::human(Principal::new("Tester", "tester@example.com"));
1046 let state = repo
1047 .snapshot_with_attribution(Some("seed".into()), None, attribution)
1048 .unwrap();
1049 let dedup = OperationDedupStore::open(repo.heddle_dir()).unwrap();
1050 let inner = GrpcLocalService::new(Arc::new(repo), Arc::new(dedup));
1051 let svc = LocalDiscussionService::new(inner);
1052
1053 let opened = svc
1054 .open_discussion(Request::new(open_request(&state.change_id, "hello", "")))
1055 .await
1056 .unwrap()
1057 .into_inner();
1058
1059 assert_eq!(opened.visibility, "internal");
1060 }
1061
1062 #[tokio::test]
1063 #[serial_test::serial(process_global)]
1064 async fn resolve_dismissed_with_empty_reason_is_invalid_argument() {
1065 let (_t, state_id, svc) = fresh_service();
1066 let opened = svc
1067 .open_discussion(Request::new(open_request(&state_id, "why?", "")))
1068 .await
1069 .unwrap()
1070 .into_inner();
1071
1072 use grpc::heddle::v1::resolve_discussion_request::{Resolution, ResolveDismissed};
1073 let err = svc
1074 .resolve_discussion(Request::new(ResolveDiscussionRequest {
1075 repo_path: String::new(),
1076 discussion_id: opened.id,
1077 resolution: Some(Resolution::Dismissed(ResolveDismissed {
1078 reason: " ".into(),
1079 })),
1080 client_operation_id: String::new(),
1081 }))
1082 .await
1083 .unwrap_err();
1084 assert_eq!(err.code(), tonic::Code::InvalidArgument);
1085 }
1086
1087 #[tokio::test]
1088 #[serial_test::serial(process_global)]
1089 async fn resolve_into_annotation_creates_context_and_resolves_discussion() {
1090 let (_t, state_id, svc) = fresh_service();
1091 let opened = svc
1092 .open_discussion(Request::new(open_request(&state_id, "why?", "")))
1093 .await
1094 .unwrap()
1095 .into_inner();
1096
1097 use grpc::heddle::v1::resolve_discussion_request::{Resolution, ResolveIntoAnnotation};
1098 let resolved = svc
1099 .resolve_discussion(Request::new(ResolveDiscussionRequest {
1100 repo_path: String::new(),
1101 discussion_id: opened.id.clone(),
1102 resolution: Some(Resolution::IntoAnnotation(ResolveIntoAnnotation {
1103 kind: ContextAnnotationKind::Rationale as i32,
1104 content: "capture this".into(),
1105 tags: vec!["todo".into()],
1106 })),
1107 client_operation_id: String::new(),
1108 }))
1109 .await
1110 .unwrap()
1111 .into_inner();
1112 let annotation_id = resolved.resolved_annotation_id.clone();
1113 assert!(
1114 !annotation_id.is_empty(),
1115 "into-annotation resolution should return the created annotation id"
1116 );
1117
1118 let repo = svc.inner.repo();
1119 let head_id = repo.head().unwrap().unwrap();
1120 assert_ne!(
1121 head_id, state_id,
1122 "resolving into context should create and publish a new HEAD state"
1123 );
1124 let head = repo.store().get_state(&head_id).unwrap().unwrap();
1125 let context_root = head.context.expect("new state should carry context");
1126 let (target, context, index) = repo
1127 .find_annotation(&context_root, &annotation_id)
1128 .unwrap()
1129 .expect("created annotation should be indexed in the context tree");
1130 assert_eq!(target.path(), Some("src/lib.rs"));
1131 let annotation = &context.annotations[index];
1132 assert_eq!(
1133 annotation.resolved_from_discussion.as_deref(),
1134 Some(opened.id.as_str())
1135 );
1136 assert_eq!(
1137 annotation.current_revision().unwrap().content,
1138 "capture this"
1139 );
1140 assert_eq!(
1141 annotation.current_revision().unwrap().tags,
1142 vec!["todo".to_string()]
1143 );
1144
1145 let discussion_blob = decode_blob_for_state(repo, &head).unwrap();
1146 let stored = discussion_blob
1147 .discussions
1148 .iter()
1149 .find(|discussion| discussion.id == opened.id)
1150 .expect("resolved discussion should still be present on new HEAD");
1151 assert_eq!(
1152 stored.resolved_annotation_id.as_deref(),
1153 Some(annotation_id.as_str())
1154 );
1155 assert!(matches!(
1156 stored.resolution,
1157 DiscussionResolution::ResolvedIntoAnnotation { .. }
1158 ));
1159 }
1160
1161 #[tokio::test]
1162 #[serial_test::serial(process_global)]
1163 async fn list_by_symbol_finds_reachable_discussions() {
1164 let (_t, state_id, svc) = fresh_service();
1165 let opened = svc
1166 .open_discussion(Request::new(open_request(&state_id, "symbol thread", "")))
1167 .await
1168 .unwrap()
1169 .into_inner();
1170
1171 let listed = svc
1172 .list_by_symbol(Request::new(ListDiscussionsBySymbolRequest {
1173 repo_path: String::new(),
1174 anchor: Some(PathSymbolRef {
1175 file: "src/lib.rs".into(),
1176 symbol: "foo".into(),
1177 }),
1178 status: "all".into(),
1179 }))
1180 .await
1181 .unwrap()
1182 .into_inner();
1183 assert_eq!(listed.discussions.len(), 1);
1184 assert_eq!(listed.discussions[0].id, opened.id);
1185 }
1186
1187 #[tokio::test]
1188 #[serial_test::serial(process_global)]
1189 async fn get_discussion_without_state_scans_reachable_discussions() {
1190 let (temp, state_id, svc) = fresh_service();
1191 std::fs::write(temp.path().join("later.txt"), "later\n").unwrap();
1192 svc.inner
1193 .repo()
1194 .snapshot(Some("later".into()), None)
1195 .expect("advance HEAD");
1196
1197 let opened = svc
1198 .open_discussion(Request::new(open_request(&state_id, "old state", "")))
1199 .await
1200 .unwrap()
1201 .into_inner();
1202
1203 let fetched = svc
1204 .get_discussion(Request::new(GetDiscussionRequest {
1205 repo_path: String::new(),
1206 discussion_id: opened.id.clone(),
1207 state_id: Vec::new(),
1208 }))
1209 .await
1210 .unwrap()
1211 .into_inner();
1212 assert_eq!(fetched.id, opened.id);
1213 assert_eq!(fetched.turns[0].body, "old state");
1214 }
1215
1216 #[tokio::test]
1217 #[serial_test::serial(process_global)]
1218 async fn list_by_state_filters_by_status() {
1219 let (_t, state_id, svc) = fresh_service();
1220 let a = svc
1222 .open_discussion(Request::new(open_request(&state_id, "a", "")))
1223 .await
1224 .unwrap()
1225 .into_inner();
1226 let _b = svc
1227 .open_discussion(Request::new(open_request(&state_id, "b", "")))
1228 .await
1229 .unwrap()
1230 .into_inner();
1231
1232 use grpc::heddle::v1::resolve_discussion_request::{Resolution, ResolveDismissed};
1233 svc.resolve_discussion(Request::new(ResolveDiscussionRequest {
1234 repo_path: String::new(),
1235 discussion_id: a.id.clone(),
1236 resolution: Some(Resolution::Dismissed(ResolveDismissed {
1237 reason: "no longer relevant".into(),
1238 })),
1239 client_operation_id: String::new(),
1240 }))
1241 .await
1242 .unwrap();
1243
1244 let head_state_id = state_id.as_bytes().to_vec();
1256 let open_only = svc
1257 .list_by_state(Request::new(ListDiscussionsByStateRequest {
1258 repo_path: String::new(),
1259 state_id: head_state_id.clone(),
1260 status: "open".into(),
1261 }))
1262 .await
1263 .unwrap()
1264 .into_inner();
1265 assert_eq!(open_only.discussions.len(), 1);
1266 assert_eq!(open_only.discussions[0].turns[0].body, "b");
1267
1268 let resolved_only = svc
1269 .list_by_state(Request::new(ListDiscussionsByStateRequest {
1270 repo_path: String::new(),
1271 state_id: head_state_id.clone(),
1272 status: "resolved".into(),
1273 }))
1274 .await
1275 .unwrap()
1276 .into_inner();
1277 assert_eq!(resolved_only.discussions.len(), 1);
1278 assert_eq!(resolved_only.discussions[0].turns[0].body, "a");
1279
1280 let all = svc
1281 .list_by_state(Request::new(ListDiscussionsByStateRequest {
1282 repo_path: String::new(),
1283 state_id: head_state_id,
1284 status: "all".into(),
1285 }))
1286 .await
1287 .unwrap()
1288 .into_inner();
1289 assert_eq!(all.discussions.len(), 2);
1290 }
1291}