Skip to main content

daemon/grpc_local_impl/
discussion.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Local gRPC service for the W2 `DiscussionService`.
3//!
4//! Reads and writes the `DiscussionsBlob` attached to a state via
5//! [`State::with_discussions`]. Open / append / resolve mutations all follow
6//! the same pattern: load the current state, decode (or create fresh) the
7//! existing blob, mutate, encode back to a new [`Blob`], persist a new
8//! `State` with the updated `discussions` content hash.
9//!
10//! State-scoped discovery reads the requested state's blob. Repository-wide
11//! discovery walks reachable states and deduplicates by discussion id, preferring
12//! the current HEAD copy when a carried-forward discussion exists in multiple
13//! states.
14
15use std::collections::HashSet;
16
17use grpc::heddle::v1::{
18    AppendTurnRequest, ContextAnnotationKind, Discussion as ProtoDiscussion,
19    DiscussionResolution as ProtoDiscussionResolution, DiscussionTurn as ProtoDiscussionTurn,
20    GetDiscussionRequest, ListDiscussionsByStateRequest, ListDiscussionsBySymbolRequest,
21    ListDiscussionsResponse, OpenDiscussionRequest, PathSymbolRef, ResolveDiscussionRequest,
22    discussion_service_server::DiscussionService,
23    resolve_discussion_request::ResolveIntoAnnotation,
24};
25use objects::{
26    lock::RepositoryLockExt,
27    object::{
28        Annotation, AnnotationKind, AnnotationScope, Blob, ChangeId, ContentHash, ContextBlob,
29        ContextTarget, Discussion, DiscussionResolution, DiscussionTurn, DiscussionsBlob,
30        Principal, State, SymbolAnchor, VisibilityTier,
31    },
32    store::ObjectStore,
33};
34use prost::Message;
35use refs::Head;
36use repo::Repository;
37use tonic::{Request, Response, Status};
38
39use super::{GrpcLocalService, to_status, with_idempotency};
40
41#[derive(Clone)]
42pub struct LocalDiscussionService {
43    inner: GrpcLocalService,
44}
45
46impl LocalDiscussionService {
47    pub fn new(inner: GrpcLocalService) -> Self {
48        Self { inner }
49    }
50}
51
52fn now_secs() -> i64 {
53    std::time::SystemTime::now()
54        .duration_since(std::time::UNIX_EPOCH)
55        .map(|d| d.as_secs() as i64)
56        .unwrap_or(0)
57}
58
59/// Flat discussion RPC visibility vocabulary. Labelled tiers carry their label
60/// in the token itself (`team:<id>`, `restricted:<label>`,
61/// `private:<label>`) so the service never has to manufacture an empty label.
62fn parse_visibility(s: &str) -> Result<VisibilityTier, Status> {
63    let trimmed = s.trim();
64    match trimmed {
65        "" | "public" => return Ok(VisibilityTier::Public),
66        "internal" => return Ok(VisibilityTier::Internal),
67        _ => {}
68    }
69
70    if let Some(team_id) = trimmed.strip_prefix("team:") {
71        let team_id = team_id.trim();
72        if team_id.is_empty() {
73            return Err(Status::invalid_argument(
74                "discussion visibility team:<id> requires a non-empty id",
75            ));
76        }
77        return Ok(VisibilityTier::TeamScoped {
78            team_id: team_id.to_string(),
79        });
80    }
81    if let Some(scope_label) = trimmed.strip_prefix("restricted:") {
82        let scope_label = scope_label.trim();
83        if scope_label.is_empty() {
84            return Err(Status::invalid_argument(
85                "discussion visibility restricted:<label> requires a non-empty label",
86            ));
87        }
88        return Ok(VisibilityTier::Restricted {
89            scope_label: scope_label.to_string(),
90        });
91    }
92    if let Some(scope_label) = trimmed.strip_prefix("private:") {
93        let scope_label = scope_label.trim();
94        if scope_label.is_empty() {
95            return Err(Status::invalid_argument(
96                "discussion visibility private:<label> requires a non-empty label",
97            ));
98        }
99        return Ok(VisibilityTier::Private {
100            scope_label: scope_label.to_string(),
101        });
102    }
103    Err(Status::invalid_argument(format!(
104        "unsupported discussion visibility {trimmed:?}; expected public, internal, team:<id>, restricted:<label>, or private:<label>"
105    )))
106}
107
108fn visibility_to_wire(visibility: &VisibilityTier) -> String {
109    match visibility {
110        VisibilityTier::Public | VisibilityTier::Internal => visibility.as_str().to_string(),
111        VisibilityTier::TeamScoped { team_id } => format!("team:{team_id}"),
112        VisibilityTier::Restricted { scope_label } => format!("restricted:{scope_label}"),
113        VisibilityTier::Private { scope_label } => format!("private:{scope_label}"),
114    }
115}
116
117fn turn_to_proto(turn: &DiscussionTurn) -> ProtoDiscussionTurn {
118    ProtoDiscussionTurn {
119        author_name: turn.author.name.clone(),
120        author_email: turn.author.email.clone(),
121        body: turn.body.clone(),
122        posted_at: Some(prost_types::Timestamp {
123            seconds: turn.posted_at,
124            nanos: 0,
125        }),
126    }
127}
128
129fn resolution_to_proto(resolution: &DiscussionResolution) -> ProtoDiscussionResolution {
130    use grpc::heddle::v1::discussion_resolution::{
131        Dismissed, Open, ResolvedByEdit, ResolvedIntoAnnotation, State,
132    };
133    let state = match resolution {
134        DiscussionResolution::Open => State::Open(Open {}),
135        DiscussionResolution::ResolvedIntoAnnotation { annotation_id } => {
136            State::IntoAnnotation(ResolvedIntoAnnotation {
137                annotation_id: annotation_id.clone(),
138            })
139        }
140        DiscussionResolution::ResolvedByEdit { state_id } => State::ByEdit(ResolvedByEdit {
141            state_id: state_id.as_bytes().to_vec(),
142        }),
143        DiscussionResolution::Dismissed { reason } => State::Dismissed(Dismissed {
144            reason: reason.clone(),
145        }),
146    };
147    ProtoDiscussionResolution { state: Some(state) }
148}
149
150fn discussion_to_proto(d: &Discussion) -> ProtoDiscussion {
151    ProtoDiscussion {
152        id: d.id.clone(),
153        anchor: Some(PathSymbolRef {
154            file: d.anchor.file.clone(),
155            symbol: d.anchor.symbol.clone(),
156        }),
157        opened_against_state: d.opened_against_state.as_bytes().to_vec(),
158        opened_at: Some(prost_types::Timestamp {
159            seconds: d.opened_at,
160            nanos: 0,
161        }),
162        thread_ref: d.thread_ref.clone().unwrap_or_default(),
163        turns: d.turns.iter().map(turn_to_proto).collect(),
164        resolution: Some(resolution_to_proto(&d.resolution)),
165        body_changed_since_open: d.body_changed_since_open,
166        orphaned: d.orphaned,
167        visibility: visibility_to_wire(&d.visibility),
168        resolved_annotation_id: d.resolved_annotation_id.clone().unwrap_or_default(),
169    }
170}
171
172/// Resolve a `state_id` string to a stored `State`, returning the parsed
173/// `ChangeId` and the loaded `State`.
174fn load_state(repo: &Repository, state_id: &[u8]) -> Result<(ChangeId, State), Status> {
175    let id = ChangeId::try_from_slice(state_id)
176        .map_err(|err| Status::invalid_argument(format!("invalid state_id: {err}")))?;
177    let state = repo
178        .store()
179        .get_state(&id)
180        .map_err(to_status)?
181        .ok_or_else(|| Status::not_found(format!("state {} not found", id.to_string_full())))?;
182    Ok((id, state))
183}
184
185/// Decode a state's `DiscussionsBlob`, returning an empty blob when the
186/// state has no discussions attached yet.
187fn decode_blob_for_state(repo: &Repository, state: &State) -> Result<DiscussionsBlob, Status> {
188    let Some(hash) = state.discussions else {
189        return Ok(DiscussionsBlob::new(Vec::new()));
190    };
191    let blob = repo
192        .store()
193        .get_blob(&hash)
194        .map_err(to_status)?
195        .ok_or_else(|| {
196            Status::not_found(format!(
197                "discussions blob {} referenced by state {} is missing",
198                hash,
199                state.change_id.to_string_full()
200            ))
201        })?;
202    DiscussionsBlob::decode(blob.content())
203        .map_err(|err| Status::internal(format!("failed to decode discussions blob: {err}")))
204}
205
206/// Convenience: load both the state and its decoded `DiscussionsBlob`.
207fn load_discussions_blob(
208    repo: &Repository,
209    state_id: &ChangeId,
210) -> Result<(State, DiscussionsBlob), Status> {
211    let state = repo
212        .store()
213        .get_state(state_id)
214        .map_err(to_status)?
215        .ok_or_else(|| {
216            Status::not_found(format!("state {} not found", state_id.to_string_full()))
217        })?;
218    let blob = decode_blob_for_state(repo, &state)?;
219    Ok((state, blob))
220}
221
222/// Encode `blob`, persist it under a fresh `ContentHash`, then build and
223/// store a new `State` with the updated `discussions` pointer.
224fn save_discussions_blob(
225    repo: &Repository,
226    state: &State,
227    blob: &DiscussionsBlob,
228) -> Result<State, Status> {
229    let hash = put_discussions_blob(repo, blob)?;
230    let new_state = state.clone().with_discussions(hash);
231    repo.store().put_state(&new_state).map_err(to_status)?;
232    Ok(new_state)
233}
234
235/// Resolve the active principal using the repository's identity chain
236/// (env/repo/Git config) and fall back to a placeholder only when that lookup
237/// itself fails. We deliberately don't fail here — discussion authorship
238/// should never block on missing config.
239fn principal_for(repo: &Repository) -> Principal {
240    repo.get_principal()
241        .unwrap_or_else(|_| Principal::new("<unknown>", ""))
242}
243
244/// Resolve the HEAD state. Returns `Status::failed_precondition` when the
245/// repository has no HEAD (a fresh repo before any thread is seeded).
246fn head_state(repo: &Repository) -> Result<State, Status> {
247    let head_id = repo
248        .head()
249        .map_err(to_status)?
250        .ok_or_else(|| Status::failed_precondition("repository has no HEAD"))?;
251    repo.store()
252        .get_state(&head_id)
253        .map_err(to_status)?
254        .ok_or_else(|| {
255            Status::not_found(format!("HEAD state {} not found", head_id.to_string_full()))
256        })
257}
258
259/// Status filter for list_by_state / list_by_symbol. Empty / unknown values
260/// behave like `"all"`.
261fn status_matches(d: &Discussion, status: &str) -> bool {
262    match status {
263        "open" => d.is_open(),
264        "resolved" => !d.is_open(),
265        "orphaned" => d.orphaned,
266        // "all", "", anything else.
267        _ => true,
268    }
269}
270
271fn put_discussions_blob(repo: &Repository, blob: &DiscussionsBlob) -> Result<ContentHash, Status> {
272    let bytes = blob
273        .encode()
274        .map_err(|err| Status::internal(format!("failed to encode discussions blob: {err}")))?;
275    repo.store().put_blob(&Blob::new(bytes)).map_err(to_status)
276}
277
278fn reachable_discussions(repo: &Repository) -> Result<Vec<(ChangeId, Discussion)>, Status> {
279    let mut out = Vec::new();
280    let mut seen = HashSet::new();
281
282    if let Some(head_id) = repo.head().map_err(to_status)?
283        && let Some(head) = repo.store().get_state(&head_id).map_err(to_status)?
284    {
285        push_discussions_from_state(repo, head_id, &head, &mut seen, &mut out)?;
286    }
287
288    for state_id in repo
289        .reachable_states()
290        .map_err(|err| Status::internal(format!("walk reachable states: {err}")))?
291    {
292        let Some(state) = repo.store().get_state(&state_id).map_err(to_status)? else {
293            continue;
294        };
295        push_discussions_from_state(repo, state_id, &state, &mut seen, &mut out)?;
296    }
297
298    Ok(out)
299}
300
301fn push_discussions_from_state(
302    repo: &Repository,
303    state_id: ChangeId,
304    state: &State,
305    seen: &mut HashSet<String>,
306    out: &mut Vec<(ChangeId, Discussion)>,
307) -> Result<(), Status> {
308    let blob = decode_blob_for_state(repo, state)?;
309    for discussion in blob.discussions {
310        if seen.insert(discussion.id.clone()) {
311            out.push((state_id, discussion));
312        }
313    }
314    Ok(())
315}
316
317fn annotation_kind_from_proto(kind: i32) -> Result<AnnotationKind, Status> {
318    match ContextAnnotationKind::try_from(kind)
319        .map_err(|_| Status::invalid_argument(format!("unknown annotation kind tag {kind}")))?
320    {
321        ContextAnnotationKind::Unspecified | ContextAnnotationKind::Rationale => {
322            Ok(AnnotationKind::Rationale)
323        }
324        ContextAnnotationKind::Constraint => Ok(AnnotationKind::Constraint),
325        ContextAnnotationKind::Invariant => Ok(AnnotationKind::Invariant),
326    }
327}
328
329fn resolve_discussion_into_annotation(
330    repo: &Repository,
331    head: &State,
332    discussions: &mut DiscussionsBlob,
333    discussion_index: usize,
334    payload: ResolveIntoAnnotation,
335) -> Result<Discussion, Status> {
336    if payload.content.trim().is_empty() {
337        return Err(Status::invalid_argument(
338            "into-annotation resolution requires non-empty content",
339        ));
340    }
341    let kind = annotation_kind_from_proto(payload.kind)?;
342    let attribution = repo
343        .get_attribution()
344        .map_err(|err| Status::internal(format!("resolve attribution: {err}")))?;
345
346    let discussion = discussions
347        .discussions
348        .get(discussion_index)
349        .ok_or_else(|| Status::internal("discussion index out of range"))?
350        .clone();
351    let target = ContextTarget::file(discussion.anchor.file.clone())
352        .map_err(|err| Status::invalid_argument(err.to_string()))?;
353    let mut scope = AnnotationScope::Symbol {
354        name: discussion.anchor.symbol.clone(),
355        resolved_lines: None,
356    };
357    target
358        .validate_scope(&scope)
359        .map_err(|err| Status::invalid_argument(err.to_string()))?;
360    let source = target.path().and_then(|path| {
361        std::fs::read(repo.root().join(path))
362            .ok()
363            .map(|bytes| (path.to_string(), bytes))
364    });
365    scope = resolve_annotation_scope(
366        source
367            .as_ref()
368            .map(|(path, bytes)| (path.as_str(), bytes.as_slice())),
369        scope,
370    );
371    let source_hash =
372        compute_annotation_source_hash(source.as_ref().map(|(_, bytes)| bytes.as_slice()), &scope);
373
374    let mut annotation = Annotation::new(
375        scope,
376        kind,
377        payload.content,
378        payload.tags,
379        attribution.to_string(),
380        now_secs(),
381        source_hash,
382        Some(head.change_id),
383    );
384    annotation.resolved_from_discussion = Some(discussion.id.clone());
385    annotation.visibility = discussion.visibility.clone();
386    let annotation_id = annotation.annotation_id.clone();
387
388    let mut context = match head.context {
389        Some(root) => repo
390            .get_context_blob(&root, &target)
391            .map_err(to_status)?
392            .unwrap_or_else(|| ContextBlob::new(Vec::new())),
393        None => ContextBlob::new(Vec::new()),
394    };
395    context.annotations.push(annotation);
396    let context_root = repo
397        .set_context_blob(head.context.as_ref(), &target, &context)
398        .map_err(to_status)?;
399
400    let updated = discussions
401        .discussions
402        .get_mut(discussion_index)
403        .ok_or_else(|| Status::internal("discussion index out of range"))?;
404    updated.resolution = DiscussionResolution::ResolvedIntoAnnotation {
405        annotation_id: annotation_id.clone(),
406    };
407    updated.resolved_annotation_id = Some(annotation_id);
408    updated
409        .validate()
410        .map_err(|err| Status::invalid_argument(err.to_string()))?;
411    let updated = updated.clone();
412
413    let discussions_hash = put_discussions_blob(repo, discussions)?;
414    let mut new_state = State::new(head.tree, vec![head.change_id], attribution)
415        .with_intent(format!(
416            "discussion: resolve {} into annotation",
417            updated.id
418        ))
419        .with_context(context_root)
420        .with_discussions(discussions_hash);
421    if let Some(provenance) = head.provenance {
422        new_state = new_state.with_provenance(provenance);
423    }
424    if let Some(risk_signals) = head.risk_signals {
425        new_state = new_state.with_risk_signals(risk_signals);
426    }
427    if let Some(review_signatures) = head.review_signatures {
428        new_state = new_state.with_review_signatures(review_signatures);
429    }
430    if let Some(structured_conflicts) = head.structured_conflicts {
431        new_state = new_state.with_structured_conflicts(structured_conflicts);
432    }
433    repo.put_authored_state(&mut new_state).map_err(to_status)?;
434    advance_head(repo, &new_state).map_err(to_status)?;
435
436    Ok(updated)
437}
438
439fn resolve_annotation_scope(
440    source: Option<(&str, &[u8])>,
441    scope: AnnotationScope,
442) -> AnnotationScope {
443    let AnnotationScope::Symbol {
444        name,
445        resolved_lines: None,
446    } = scope
447    else {
448        return scope;
449    };
450    let Some((path, source)) = source else {
451        return AnnotationScope::Symbol {
452            name,
453            resolved_lines: None,
454        };
455    };
456    #[cfg(feature = "semantic")]
457    {
458        match repo::symbol_resolver::resolve_symbol_lines(source, std::path::Path::new(path), &name)
459        {
460            Ok((start, end)) => AnnotationScope::Symbol {
461                name,
462                resolved_lines: Some((start, end)),
463            },
464            Err(_) => AnnotationScope::Symbol {
465                name,
466                resolved_lines: None,
467            },
468        }
469    }
470    #[cfg(not(feature = "semantic"))]
471    {
472        let _ = path;
473        let _ = source;
474        AnnotationScope::Symbol {
475            name,
476            resolved_lines: None,
477        }
478    }
479}
480
481fn compute_annotation_source_hash(
482    source: Option<&[u8]>,
483    scope: &AnnotationScope,
484) -> Option<ContentHash> {
485    let source = source?;
486    let scoped = match scope {
487        AnnotationScope::Lines(start, end) => extract_line_range(source, *start, *end),
488        AnnotationScope::Symbol {
489            resolved_lines: Some((start, end)),
490            ..
491        } => extract_line_range(source, *start, *end),
492        _ => source.to_vec(),
493    };
494    Some(ContentHash::compute(&scoped))
495}
496
497fn extract_line_range(source: &[u8], start: u32, end: u32) -> Vec<u8> {
498    let start_line = start.max(1);
499    let end_line = end.max(start_line);
500    let mut current_line = 1;
501    let mut start_byte = (start_line == 1).then_some(0);
502    let mut end_byte = None;
503
504    for (idx, byte) in source.iter().enumerate() {
505        if *byte != b'\n' {
506            continue;
507        }
508        if current_line == end_line {
509            end_byte = Some(idx + 1);
510            break;
511        }
512        current_line += 1;
513        if current_line == start_line {
514            start_byte = Some(idx + 1);
515        }
516    }
517
518    let Some(start_byte) = start_byte else {
519        return Vec::new();
520    };
521    let end_byte = end_byte.unwrap_or(source.len());
522    if start_byte > end_byte || start_byte > source.len() {
523        return Vec::new();
524    }
525    source[start_byte..end_byte].to_vec()
526}
527
528fn advance_head(repo: &Repository, state: &State) -> repo::Result<()> {
529    match repo.refs().read_head()? {
530        Head::Attached { thread } => repo.refs().set_thread(&thread, &state.change_id),
531        Head::Detached { .. } => repo.refs().write_head(&Head::Detached {
532            state: state.change_id,
533        }),
534    }
535}
536
537#[tonic::async_trait]
538impl DiscussionService for LocalDiscussionService {
539    async fn open_discussion(
540        &self,
541        request: Request<OpenDiscussionRequest>,
542    ) -> Result<Response<ProtoDiscussion>, Status> {
543        let req = request.into_inner();
544        let req_bytes = req.encode_to_vec();
545        let client_op_id = req.client_operation_id.clone();
546        let inner = self.inner.clone();
547
548        let result = with_idempotency(
549            &self.inner,
550            &client_op_id,
551            "discussion.open",
552            &req_bytes,
553            move || {
554                let req = req.clone();
555                let inner = inner.clone();
556                async move {
557                    let repo = inner.repo();
558                    let anchor_proto = req
559                        .anchor
560                        .clone()
561                        .ok_or_else(|| Status::invalid_argument("anchor is required"))?;
562                    if anchor_proto.file.is_empty() {
563                        return Err(Status::invalid_argument("anchor.file is required"));
564                    }
565                    if anchor_proto.symbol.is_empty() {
566                        return Err(Status::invalid_argument("anchor.symbol is required"));
567                    }
568                    if req.body.trim().is_empty() {
569                        return Err(Status::invalid_argument("body must be non-empty"));
570                    }
571                    let opened_against =
572                        ChangeId::try_from_slice(&req.state_id).map_err(|err| {
573                            Status::invalid_argument(format!("invalid state_id: {err}"))
574                        })?;
575                    let now = now_secs();
576                    let principal = principal_for(repo);
577                    let visibility = if req.visibility.trim().is_empty() {
578                        repo.resolve_capture_default_visibility()
579                    } else {
580                        parse_visibility(&req.visibility)?
581                    };
582                    let discussion = Discussion {
583                        id: ChangeId::generate().to_string_full(),
584                        anchor: SymbolAnchor::new(anchor_proto.file, anchor_proto.symbol),
585                        opened_against_state: opened_against,
586                        opened_at: now,
587                        thread_ref: (!req.thread_ref.is_empty()).then(|| req.thread_ref.clone()),
588                        turns: vec![DiscussionTurn {
589                            author: principal,
590                            body: req.body.clone(),
591                            posted_at: now,
592                        }],
593                        resolution: DiscussionResolution::Open,
594                        body_changed_since_open: false,
595                        orphaned: false,
596                        visibility,
597                        resolved_annotation_id: None,
598                    };
599                    discussion
600                        .validate()
601                        .map_err(|err| Status::invalid_argument(err.to_string()))?;
602                    let _lock = repo
603                        .locker()
604                        .write()
605                        .map_err(|err| Status::internal(err.to_string()))?;
606                    let (state, mut blob) = load_discussions_blob(repo, &opened_against)?;
607                    blob.discussions.push(discussion.clone());
608                    save_discussions_blob(repo, &state, &blob)?;
609                    Ok(discussion_to_proto(&discussion))
610                }
611            },
612        )
613        .await?;
614
615        Ok(Response::new(result))
616    }
617
618    async fn append_turn(
619        &self,
620        request: Request<AppendTurnRequest>,
621    ) -> Result<Response<ProtoDiscussion>, Status> {
622        let req = request.into_inner();
623        let req_bytes = req.encode_to_vec();
624        let client_op_id = req.client_operation_id.clone();
625        let inner = self.inner.clone();
626
627        let result = with_idempotency(
628            &self.inner,
629            &client_op_id,
630            "discussion.append_turn",
631            &req_bytes,
632            move || {
633                let req = req.clone();
634                let inner = inner.clone();
635                async move {
636                    let repo = inner.repo();
637                    if req.discussion_id.is_empty() {
638                        return Err(Status::invalid_argument("discussion_id is required"));
639                    }
640                    if req.body.trim().is_empty() {
641                        return Err(Status::invalid_argument("body must be non-empty"));
642                    }
643                    // This local mutation API is HEAD-scoped because the
644                    // request carries no state_id. It must not pretend to find
645                    // and mutate discussions across the whole repository.
646                    let principal = principal_for(repo);
647                    let _lock = repo
648                        .locker()
649                        .write()
650                        .map_err(|err| Status::internal(err.to_string()))?;
651                    let head = head_state(repo)?;
652                    let mut blob = decode_blob_for_state(repo, &head)?;
653                    let idx = blob
654                        .discussions
655                        .iter()
656                        .position(|d| d.id == req.discussion_id)
657                        .ok_or_else(|| {
658                            Status::not_found(format!("discussion {} not found", req.discussion_id))
659                        })?;
660                    blob.discussions[idx].turns.push(DiscussionTurn {
661                        author: principal,
662                        body: req.body.clone(),
663                        posted_at: now_secs(),
664                    });
665                    blob.discussions[idx]
666                        .validate()
667                        .map_err(|err| Status::invalid_argument(err.to_string()))?;
668                    let updated = blob.discussions[idx].clone();
669                    save_discussions_blob(repo, &head, &blob)?;
670                    Ok(discussion_to_proto(&updated))
671                }
672            },
673        )
674        .await?;
675
676        Ok(Response::new(result))
677    }
678
679    async fn resolve_discussion(
680        &self,
681        request: Request<ResolveDiscussionRequest>,
682    ) -> Result<Response<ProtoDiscussion>, Status> {
683        let req = request.into_inner();
684        let req_bytes = req.encode_to_vec();
685        let client_op_id = req.client_operation_id.clone();
686        let inner = self.inner.clone();
687
688        let result = with_idempotency(
689            &self.inner,
690            &client_op_id,
691            "discussion.resolve",
692            &req_bytes,
693            move || {
694                let req = req.clone();
695                let inner = inner.clone();
696                async move {
697                    let repo = inner.repo();
698                    if req.discussion_id.is_empty() {
699                        return Err(Status::invalid_argument("discussion_id is required"));
700                    }
701                    // This local mutation API is HEAD-scoped because the
702                    // request carries no state_id. It must not pretend to find
703                    // and mutate discussions across the whole repository.
704                    use grpc::heddle::v1::resolve_discussion_request::Resolution;
705                    let resolution = req
706                        .resolution
707                        .clone()
708                        .ok_or_else(|| Status::invalid_argument("resolution mode is required"))?;
709                    if let Resolution::Dismissed(ref payload) = resolution
710                        && payload.reason.trim().is_empty()
711                    {
712                        return Err(Status::invalid_argument(
713                            "dismissal requires a non-empty reason",
714                        ));
715                    }
716
717                    let _lock = repo
718                        .locker()
719                        .write()
720                        .map_err(|err| Status::internal(err.to_string()))?;
721                    let head = head_state(repo)?;
722                    let mut blob = decode_blob_for_state(repo, &head)?;
723                    let idx = blob
724                        .discussions
725                        .iter()
726                        .position(|d| d.id == req.discussion_id)
727                        .ok_or_else(|| {
728                            Status::not_found(format!("discussion {} not found", req.discussion_id))
729                        })?;
730
731                    match resolution {
732                        Resolution::IntoAnnotation(payload) => {
733                            let updated = resolve_discussion_into_annotation(
734                                repo, &head, &mut blob, idx, payload,
735                            )?;
736                            return Ok(discussion_to_proto(&updated));
737                        }
738                        Resolution::ByEdit(payload) => {
739                            let state_id =
740                                ChangeId::try_from_slice(&payload.state_id).map_err(|err| {
741                                    Status::invalid_argument(format!("invalid state_id: {err}"))
742                                })?;
743                            blob.discussions[idx].resolution =
744                                DiscussionResolution::ResolvedByEdit { state_id };
745                        }
746                        Resolution::Dismissed(payload) => {
747                            blob.discussions[idx].resolution = DiscussionResolution::Dismissed {
748                                reason: payload.reason,
749                            };
750                        }
751                    }
752
753                    blob.discussions[idx]
754                        .validate()
755                        .map_err(|err| Status::invalid_argument(err.to_string()))?;
756                    let updated = blob.discussions[idx].clone();
757                    save_discussions_blob(repo, &head, &blob)?;
758                    Ok(discussion_to_proto(&updated))
759                }
760            },
761        )
762        .await?;
763
764        Ok(Response::new(result))
765    }
766
767    async fn list_by_state(
768        &self,
769        request: Request<ListDiscussionsByStateRequest>,
770    ) -> Result<Response<ListDiscussionsResponse>, Status> {
771        let req = request.into_inner();
772        let repo = self.inner.repo();
773        let (_, state) = load_state(repo, &req.state_id)?;
774        let blob = decode_blob_for_state(repo, &state)?;
775        let discussions = blob
776            .discussions
777            .iter()
778            .filter(|d| status_matches(d, &req.status))
779            .map(discussion_to_proto)
780            .collect();
781        Ok(Response::new(ListDiscussionsResponse { discussions }))
782    }
783
784    async fn list_by_symbol(
785        &self,
786        request: Request<ListDiscussionsBySymbolRequest>,
787    ) -> Result<Response<ListDiscussionsResponse>, Status> {
788        let req = request.into_inner();
789        let anchor = req
790            .anchor
791            .ok_or_else(|| Status::invalid_argument("anchor is required"))?;
792        if anchor.file.is_empty() || anchor.symbol.is_empty() {
793            return Err(Status::invalid_argument(
794                "anchor.file and anchor.symbol are required",
795            ));
796        }
797        let repo = self.inner.repo();
798        let discussions = reachable_discussions(repo)?
799            .into_iter()
800            .map(|(_, discussion)| discussion)
801            .filter(|discussion| {
802                discussion.anchor.file == anchor.file
803                    && discussion.anchor.symbol == anchor.symbol
804                    && status_matches(discussion, &req.status)
805            })
806            .map(|discussion| discussion_to_proto(&discussion))
807            .collect();
808        Ok(Response::new(ListDiscussionsResponse { discussions }))
809    }
810
811    async fn get_discussion(
812        &self,
813        request: Request<GetDiscussionRequest>,
814    ) -> Result<Response<ProtoDiscussion>, Status> {
815        let req = request.into_inner();
816        if req.discussion_id.is_empty() {
817            return Err(Status::invalid_argument("discussion_id is required"));
818        }
819        // Default: HEAD first, then reachable states. Optional `state_id`
820        // (#836) resolves the discussion against a specific prior state.
821        let repo = self.inner.repo();
822        if req.state_id.is_empty() {
823            let discussion = reachable_discussions(repo)?
824                .into_iter()
825                .map(|(_, discussion)| discussion)
826                .find(|discussion| discussion.id == req.discussion_id)
827                .ok_or_else(|| {
828                    Status::not_found(format!("discussion {} not found", req.discussion_id))
829                })?;
830            return Ok(Response::new(discussion_to_proto(&discussion)));
831        }
832
833        let state = load_state(repo, &req.state_id)?.1;
834        let blob = decode_blob_for_state(repo, &state)?;
835        let discussion = blob
836            .discussions
837            .iter()
838            .find(|d| d.id == req.discussion_id)
839            .ok_or_else(|| {
840                Status::not_found(format!("discussion {} not found", req.discussion_id))
841            })?;
842        Ok(Response::new(discussion_to_proto(discussion)))
843    }
844}
845
846#[cfg(test)]
847mod tests {
848    use std::sync::Arc;
849
850    use objects::object::{Attribution, Principal};
851    use repo::{Repository, operation_dedup::OperationDedupStore};
852    use tempfile::TempDir;
853
854    use super::*;
855
856    fn fresh_service() -> (TempDir, ChangeId, LocalDiscussionService) {
857        let temp = TempDir::new().unwrap();
858        let repo = Repository::init_default(temp.path()).unwrap();
859        // Take a snapshot so we have a real state to anchor discussions against.
860        let attribution = Attribution::human(Principal::new("Tester", "tester@example.com"));
861        let state = repo
862            .snapshot_with_attribution(Some("seed".into()), None, attribution)
863            .unwrap();
864        let dedup = OperationDedupStore::open(repo.heddle_dir()).unwrap();
865        let inner = GrpcLocalService::new(Arc::new(repo), Arc::new(dedup));
866        let svc = LocalDiscussionService::new(inner);
867        (temp, state.change_id, svc)
868    }
869
870    fn open_request(state_id: &ChangeId, body: &str, op_id: &str) -> OpenDiscussionRequest {
871        OpenDiscussionRequest {
872            repo_path: String::new(),
873            state_id: state_id.as_bytes().to_vec(),
874            anchor: Some(PathSymbolRef {
875                file: "src/lib.rs".into(),
876                symbol: "foo".into(),
877            }),
878            body: body.into(),
879            visibility: String::new(),
880            thread_ref: String::new(),
881            client_operation_id: op_id.into(),
882        }
883    }
884
885    #[tokio::test]
886    #[serial_test::serial(process_global)]
887    async fn open_then_append_turn_persists_both_turns() {
888        let (_t, state_id, svc) = fresh_service();
889        let opened = svc
890            .open_discussion(Request::new(open_request(&state_id, "first", "")))
891            .await
892            .unwrap()
893            .into_inner();
894        assert_eq!(opened.turns.len(), 1);
895        assert_eq!(opened.turns[0].body, "first");
896
897        let appended = svc
898            .append_turn(Request::new(AppendTurnRequest {
899                repo_path: String::new(),
900                discussion_id: opened.id.clone(),
901                body: "second".into(),
902                client_operation_id: String::new(),
903            }))
904            .await
905            .unwrap()
906            .into_inner();
907        assert_eq!(appended.turns.len(), 2);
908        assert_eq!(appended.turns[0].body, "first");
909        assert_eq!(appended.turns[1].body, "second");
910
911        // Confirm the on-disk state actually carries both turns: re-list.
912        let listed = svc
913            .list_by_state(Request::new(ListDiscussionsByStateRequest {
914                repo_path: String::new(),
915                state_id: state_id.as_bytes().to_vec(),
916                status: "all".into(),
917            }))
918            .await
919            .unwrap()
920            .into_inner();
921        // The discussion was attached to the original state so list_by_state
922        // on that state still finds it.
923        assert_eq!(listed.discussions.len(), 1);
924        assert_eq!(listed.discussions[0].turns.len(), 2);
925    }
926
927    #[tokio::test]
928    #[serial_test::serial(process_global)]
929    async fn open_idempotent_returns_same_discussion() {
930        let (_t, state_id, svc) = fresh_service();
931        let op_id = "11111111-2222-3333-4444-555555555555";
932        let first = svc
933            .open_discussion(Request::new(open_request(&state_id, "hello", op_id)))
934            .await
935            .unwrap()
936            .into_inner();
937        let second = svc
938            .open_discussion(Request::new(open_request(&state_id, "hello", op_id)))
939            .await
940            .unwrap()
941            .into_inner();
942        assert_eq!(first.id, second.id);
943        assert_eq!(first.turns.len(), 1);
944        assert_eq!(second.turns.len(), 1);
945    }
946
947    #[tokio::test]
948    #[serial_test::serial(process_global)]
949    async fn open_discussion_serializes_concurrent_appends() {
950        // Regression for the lost-update race: two OpenDiscussions with
951        // different operation ids could both read the same base
952        // `DiscussionsBlob`, then the second `put_state` would clobber the
953        // first discussion. The fix wraps the read-modify-write in
954        // `repo.locker().write()` and re-loads the blob inside the lock.
955        let (_t, state_id, svc) = fresh_service();
956        let op_a = objects::object::OperationId::new().to_string();
957        let op_b = objects::object::OperationId::new().to_string();
958        let mut req_a = open_request(&state_id, "body a", &op_a);
959        req_a.anchor.as_mut().unwrap().symbol = "sym_a".into();
960        let mut req_b = open_request(&state_id, "body b", &op_b);
961        req_b.anchor.as_mut().unwrap().symbol = "sym_b".into();
962
963        let svc_a = svc.clone();
964        let svc_b = svc.clone();
965        let (a, b) = tokio::join!(
966            svc_a.open_discussion(Request::new(req_a)),
967            svc_b.open_discussion(Request::new(req_b)),
968        );
969        a.expect("first open_discussion");
970        b.expect("second open_discussion");
971
972        let listed = svc
973            .list_by_state(Request::new(ListDiscussionsByStateRequest {
974                repo_path: String::new(),
975                state_id: state_id.as_bytes().to_vec(),
976                status: "all".into(),
977            }))
978            .await
979            .expect("list_by_state");
980        assert_eq!(
981            listed.get_ref().discussions.len(),
982            2,
983            "both concurrent discussions must land — neither should be lost \
984             to a stale-blob clobber"
985        );
986    }
987
988    #[tokio::test]
989    #[serial_test::serial(process_global)]
990    async fn open_rejects_labelled_visibility_tiers_without_labels() {
991        let (_t, state_id, svc) = fresh_service();
992
993        for visibility in [
994            "team",
995            "team:",
996            "team_scoped",
997            "restricted",
998            "restricted:",
999            "private",
1000            "private:",
1001            "unknown",
1002        ] {
1003            let mut req = open_request(&state_id, "hello", "");
1004            req.visibility = visibility.into();
1005            let err = svc.open_discussion(Request::new(req)).await.unwrap_err();
1006            assert_eq!(err.code(), tonic::Code::InvalidArgument);
1007        }
1008    }
1009
1010    #[tokio::test]
1011    #[serial_test::serial(process_global)]
1012    async fn open_round_trips_supported_visibility_tiers() {
1013        let (_t, state_id, svc) = fresh_service();
1014
1015        for (visibility, expected) in [
1016            ("", "public"),
1017            ("public", "public"),
1018            ("internal", "internal"),
1019            ("team:platform", "team:platform"),
1020            ("restricted:legal", "restricted:legal"),
1021            ("private:embargo-x", "private:embargo-x"),
1022        ] {
1023            let mut req = open_request(&state_id, "hello", "");
1024            req.visibility = visibility.into();
1025            let opened = svc
1026                .open_discussion(Request::new(req))
1027                .await
1028                .unwrap()
1029                .into_inner();
1030            assert_eq!(opened.visibility, expected);
1031        }
1032    }
1033
1034    #[tokio::test]
1035    #[serial_test::serial(process_global)]
1036    async fn open_empty_visibility_uses_repo_discussion_default() {
1037        let temp = TempDir::new().unwrap();
1038        Repository::init_default(temp.path()).unwrap();
1039        std::fs::write(
1040            temp.path().join(".heddle/config.toml"),
1041            "[repository]\nversion = 1\n\n[review.discussion]\ndefault_visibility = \"Internal\"\n",
1042        )
1043        .unwrap();
1044        let repo = Repository::open(temp.path()).unwrap();
1045        let attribution = Attribution::human(Principal::new("Tester", "tester@example.com"));
1046        let state = repo
1047            .snapshot_with_attribution(Some("seed".into()), None, attribution)
1048            .unwrap();
1049        let dedup = OperationDedupStore::open(repo.heddle_dir()).unwrap();
1050        let inner = GrpcLocalService::new(Arc::new(repo), Arc::new(dedup));
1051        let svc = LocalDiscussionService::new(inner);
1052
1053        let opened = svc
1054            .open_discussion(Request::new(open_request(&state.change_id, "hello", "")))
1055            .await
1056            .unwrap()
1057            .into_inner();
1058
1059        assert_eq!(opened.visibility, "internal");
1060    }
1061
1062    #[tokio::test]
1063    #[serial_test::serial(process_global)]
1064    async fn resolve_dismissed_with_empty_reason_is_invalid_argument() {
1065        let (_t, state_id, svc) = fresh_service();
1066        let opened = svc
1067            .open_discussion(Request::new(open_request(&state_id, "why?", "")))
1068            .await
1069            .unwrap()
1070            .into_inner();
1071
1072        use grpc::heddle::v1::resolve_discussion_request::{Resolution, ResolveDismissed};
1073        let err = svc
1074            .resolve_discussion(Request::new(ResolveDiscussionRequest {
1075                repo_path: String::new(),
1076                discussion_id: opened.id,
1077                resolution: Some(Resolution::Dismissed(ResolveDismissed {
1078                    reason: "   ".into(),
1079                })),
1080                client_operation_id: String::new(),
1081            }))
1082            .await
1083            .unwrap_err();
1084        assert_eq!(err.code(), tonic::Code::InvalidArgument);
1085    }
1086
1087    #[tokio::test]
1088    #[serial_test::serial(process_global)]
1089    async fn resolve_into_annotation_creates_context_and_resolves_discussion() {
1090        let (_t, state_id, svc) = fresh_service();
1091        let opened = svc
1092            .open_discussion(Request::new(open_request(&state_id, "why?", "")))
1093            .await
1094            .unwrap()
1095            .into_inner();
1096
1097        use grpc::heddle::v1::resolve_discussion_request::{Resolution, ResolveIntoAnnotation};
1098        let resolved = svc
1099            .resolve_discussion(Request::new(ResolveDiscussionRequest {
1100                repo_path: String::new(),
1101                discussion_id: opened.id.clone(),
1102                resolution: Some(Resolution::IntoAnnotation(ResolveIntoAnnotation {
1103                    kind: ContextAnnotationKind::Rationale as i32,
1104                    content: "capture this".into(),
1105                    tags: vec!["todo".into()],
1106                })),
1107                client_operation_id: String::new(),
1108            }))
1109            .await
1110            .unwrap()
1111            .into_inner();
1112        let annotation_id = resolved.resolved_annotation_id.clone();
1113        assert!(
1114            !annotation_id.is_empty(),
1115            "into-annotation resolution should return the created annotation id"
1116        );
1117
1118        let repo = svc.inner.repo();
1119        let head_id = repo.head().unwrap().unwrap();
1120        assert_ne!(
1121            head_id, state_id,
1122            "resolving into context should create and publish a new HEAD state"
1123        );
1124        let head = repo.store().get_state(&head_id).unwrap().unwrap();
1125        let context_root = head.context.expect("new state should carry context");
1126        let (target, context, index) = repo
1127            .find_annotation(&context_root, &annotation_id)
1128            .unwrap()
1129            .expect("created annotation should be indexed in the context tree");
1130        assert_eq!(target.path(), Some("src/lib.rs"));
1131        let annotation = &context.annotations[index];
1132        assert_eq!(
1133            annotation.resolved_from_discussion.as_deref(),
1134            Some(opened.id.as_str())
1135        );
1136        assert_eq!(
1137            annotation.current_revision().unwrap().content,
1138            "capture this"
1139        );
1140        assert_eq!(
1141            annotation.current_revision().unwrap().tags,
1142            vec!["todo".to_string()]
1143        );
1144
1145        let discussion_blob = decode_blob_for_state(repo, &head).unwrap();
1146        let stored = discussion_blob
1147            .discussions
1148            .iter()
1149            .find(|discussion| discussion.id == opened.id)
1150            .expect("resolved discussion should still be present on new HEAD");
1151        assert_eq!(
1152            stored.resolved_annotation_id.as_deref(),
1153            Some(annotation_id.as_str())
1154        );
1155        assert!(matches!(
1156            stored.resolution,
1157            DiscussionResolution::ResolvedIntoAnnotation { .. }
1158        ));
1159    }
1160
1161    #[tokio::test]
1162    #[serial_test::serial(process_global)]
1163    async fn list_by_symbol_finds_reachable_discussions() {
1164        let (_t, state_id, svc) = fresh_service();
1165        let opened = svc
1166            .open_discussion(Request::new(open_request(&state_id, "symbol thread", "")))
1167            .await
1168            .unwrap()
1169            .into_inner();
1170
1171        let listed = svc
1172            .list_by_symbol(Request::new(ListDiscussionsBySymbolRequest {
1173                repo_path: String::new(),
1174                anchor: Some(PathSymbolRef {
1175                    file: "src/lib.rs".into(),
1176                    symbol: "foo".into(),
1177                }),
1178                status: "all".into(),
1179            }))
1180            .await
1181            .unwrap()
1182            .into_inner();
1183        assert_eq!(listed.discussions.len(), 1);
1184        assert_eq!(listed.discussions[0].id, opened.id);
1185    }
1186
1187    #[tokio::test]
1188    #[serial_test::serial(process_global)]
1189    async fn get_discussion_without_state_scans_reachable_discussions() {
1190        let (temp, state_id, svc) = fresh_service();
1191        std::fs::write(temp.path().join("later.txt"), "later\n").unwrap();
1192        svc.inner
1193            .repo()
1194            .snapshot(Some("later".into()), None)
1195            .expect("advance HEAD");
1196
1197        let opened = svc
1198            .open_discussion(Request::new(open_request(&state_id, "old state", "")))
1199            .await
1200            .unwrap()
1201            .into_inner();
1202
1203        let fetched = svc
1204            .get_discussion(Request::new(GetDiscussionRequest {
1205                repo_path: String::new(),
1206                discussion_id: opened.id.clone(),
1207                state_id: Vec::new(),
1208            }))
1209            .await
1210            .unwrap()
1211            .into_inner();
1212        assert_eq!(fetched.id, opened.id);
1213        assert_eq!(fetched.turns[0].body, "old state");
1214    }
1215
1216    #[tokio::test]
1217    #[serial_test::serial(process_global)]
1218    async fn list_by_state_filters_by_status() {
1219        let (_t, state_id, svc) = fresh_service();
1220        // Open two discussions, dismiss one of them.
1221        let a = svc
1222            .open_discussion(Request::new(open_request(&state_id, "a", "")))
1223            .await
1224            .unwrap()
1225            .into_inner();
1226        let _b = svc
1227            .open_discussion(Request::new(open_request(&state_id, "b", "")))
1228            .await
1229            .unwrap()
1230            .into_inner();
1231
1232        use grpc::heddle::v1::resolve_discussion_request::{Resolution, ResolveDismissed};
1233        svc.resolve_discussion(Request::new(ResolveDiscussionRequest {
1234            repo_path: String::new(),
1235            discussion_id: a.id.clone(),
1236            resolution: Some(Resolution::Dismissed(ResolveDismissed {
1237                reason: "no longer relevant".into(),
1238            })),
1239            client_operation_id: String::new(),
1240        }))
1241        .await
1242        .unwrap();
1243
1244        // The dismissal mutates the HEAD state's blob, not the original
1245        // state's blob. So `list_by_state(state_id, "open")` should still
1246        // see two open discussions on the *original* state_id (since the
1247        // resolve wrote to HEAD, which advanced past state_id only when a
1248        // new snapshot was taken — in our test repo HEAD is still
1249        // state_id from `seed`).
1250        //
1251        // To make a deterministic assertion regardless of HEAD movement
1252        // we instead query the HEAD state, which is where resolve_*
1253        // wrote its mutation. We rely on `repo.head()` matching
1254        // `state_id` because we never took an additional snapshot.
1255        let head_state_id = state_id.as_bytes().to_vec();
1256        let open_only = svc
1257            .list_by_state(Request::new(ListDiscussionsByStateRequest {
1258                repo_path: String::new(),
1259                state_id: head_state_id.clone(),
1260                status: "open".into(),
1261            }))
1262            .await
1263            .unwrap()
1264            .into_inner();
1265        assert_eq!(open_only.discussions.len(), 1);
1266        assert_eq!(open_only.discussions[0].turns[0].body, "b");
1267
1268        let resolved_only = svc
1269            .list_by_state(Request::new(ListDiscussionsByStateRequest {
1270                repo_path: String::new(),
1271                state_id: head_state_id.clone(),
1272                status: "resolved".into(),
1273            }))
1274            .await
1275            .unwrap()
1276            .into_inner();
1277        assert_eq!(resolved_only.discussions.len(), 1);
1278        assert_eq!(resolved_only.discussions[0].turns[0].body, "a");
1279
1280        let all = svc
1281            .list_by_state(Request::new(ListDiscussionsByStateRequest {
1282                repo_path: String::new(),
1283                state_id: head_state_id,
1284                status: "all".into(),
1285            }))
1286            .await
1287            .unwrap()
1288            .into_inner();
1289        assert_eq!(all.discussions.len(), 2);
1290    }
1291}