Skip to main content

daemon/grpc_local_impl/
discussion.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Local gRPC service for the W2 `DiscussionService`.
3//!
4//! Reads and writes the `DiscussionsBlob` attached to a state via
5//! [`State::with_discussions`]. Open / append / resolve mutations all follow
6//! the same pattern: load the current state, decode (or create fresh) the
7//! existing blob, mutate, encode back to a new [`Blob`], persist a new
8//! `State` with the updated `discussions` content hash.
9//!
10//! Discovery RPCs (lookup by id, lookup by symbol) currently scan the HEAD
11//! state's blob only. A cross-state index is W2 follow-up work and is
12//! flagged with `// TODO(W2-followup):` comments.
13
14use grpc::heddle::v1::{
15    AppendTurnRequest, Discussion as ProtoDiscussion,
16    DiscussionResolution as ProtoDiscussionResolution, DiscussionTurn as ProtoDiscussionTurn,
17    GetDiscussionRequest, ListDiscussionsByStateRequest, ListDiscussionsBySymbolRequest,
18    ListDiscussionsResponse, OpenDiscussionRequest, PathSymbolRef, ResolveDiscussionRequest,
19    discussion_service_server::DiscussionService,
20};
21use objects::{
22    object::{
23        Blob, ChangeId, Discussion, DiscussionResolution, DiscussionTurn, DiscussionsBlob,
24        Principal, State, SymbolAnchor, VisibilityTier,
25    },
26    store::ObjectStore,
27};
28use prost::Message;
29use repo::Repository;
30use tonic::{Request, Response, Status};
31
32use super::{GrpcLocalService, to_status, with_idempotency};
33
34#[derive(Clone)]
35pub struct LocalDiscussionService {
36    inner: GrpcLocalService,
37}
38
39impl LocalDiscussionService {
40    pub fn new(inner: GrpcLocalService) -> Self {
41        Self { inner }
42    }
43}
44
45fn now_secs() -> i64 {
46    std::time::SystemTime::now()
47        .duration_since(std::time::UNIX_EPOCH)
48        .map(|d| d.as_secs() as i64)
49        .unwrap_or(0)
50}
51
52/// Wire vocabulary mirrors `VisibilityTier::as_str`. Empty / unknown
53/// strings collapse to `Public` (the proto convention is "empty means
54/// default"). `team_scoped` and `restricted` round-trip through this path
55/// without an associated label — they're admitted for forward-compat with
56/// the namespace policy override path; callers wanting a labelled value
57/// must go through a richer surface that doesn't yet exist on this RPC.
58fn parse_visibility(s: &str) -> VisibilityTier {
59    match s {
60        "internal" => VisibilityTier::Internal,
61        "team_scoped" => VisibilityTier::TeamScoped {
62            team_id: String::new(),
63        },
64        "restricted" => VisibilityTier::Restricted {
65            scope_label: String::new(),
66        },
67        // "public", "", or anything else.
68        _ => VisibilityTier::Public,
69    }
70}
71
72fn turn_to_proto(turn: &DiscussionTurn) -> ProtoDiscussionTurn {
73    ProtoDiscussionTurn {
74        author_name: turn.author.name.clone(),
75        author_email: turn.author.email.clone(),
76        body: turn.body.clone(),
77        posted_at: Some(prost_types::Timestamp {
78            seconds: turn.posted_at,
79            nanos: 0,
80        }),
81    }
82}
83
84fn resolution_to_proto(resolution: &DiscussionResolution) -> ProtoDiscussionResolution {
85    use grpc::heddle::v1::discussion_resolution::{
86        Dismissed, Open, ResolvedByEdit, ResolvedIntoAnnotation, State,
87    };
88    let state = match resolution {
89        DiscussionResolution::Open => State::Open(Open {}),
90        DiscussionResolution::ResolvedIntoAnnotation { annotation_id } => {
91            State::IntoAnnotation(ResolvedIntoAnnotation {
92                annotation_id: annotation_id.clone(),
93            })
94        }
95        DiscussionResolution::ResolvedByEdit { state_id } => State::ByEdit(ResolvedByEdit {
96            state_id: state_id.as_bytes().to_vec(),
97        }),
98        DiscussionResolution::Dismissed { reason } => State::Dismissed(Dismissed {
99            reason: reason.clone(),
100        }),
101    };
102    ProtoDiscussionResolution { state: Some(state) }
103}
104
105fn discussion_to_proto(d: &Discussion) -> ProtoDiscussion {
106    ProtoDiscussion {
107        id: d.id.clone(),
108        anchor: Some(PathSymbolRef {
109            file: d.anchor.file.clone(),
110            symbol: d.anchor.symbol.clone(),
111        }),
112        opened_against_state: d.opened_against_state.as_bytes().to_vec(),
113        opened_at: Some(prost_types::Timestamp {
114            seconds: d.opened_at,
115            nanos: 0,
116        }),
117        thread_ref: d.thread_ref.clone().unwrap_or_default(),
118        turns: d.turns.iter().map(turn_to_proto).collect(),
119        resolution: Some(resolution_to_proto(&d.resolution)),
120        body_changed_since_open: d.body_changed_since_open,
121        orphaned: d.orphaned,
122        visibility: d.visibility.as_str().to_string(),
123        resolved_annotation_id: d.resolved_annotation_id.clone().unwrap_or_default(),
124    }
125}
126
127/// Resolve a `state_id` string to a stored `State`, returning the parsed
128/// `ChangeId` and the loaded `State`.
129fn load_state(repo: &Repository, state_id: &[u8]) -> Result<(ChangeId, State), Status> {
130    let id = ChangeId::try_from_slice(state_id)
131        .map_err(|err| Status::invalid_argument(format!("invalid state_id: {err}")))?;
132    let state = repo
133        .store()
134        .get_state(&id)
135        .map_err(to_status)?
136        .ok_or_else(|| Status::not_found(format!("state {} not found", id.to_string_full())))?;
137    Ok((id, state))
138}
139
140/// Decode a state's `DiscussionsBlob`, returning an empty blob when the
141/// state has no discussions attached yet.
142fn decode_blob_for_state(repo: &Repository, state: &State) -> Result<DiscussionsBlob, Status> {
143    let Some(hash) = state.discussions else {
144        return Ok(DiscussionsBlob::new(Vec::new()));
145    };
146    let blob = repo
147        .store()
148        .get_blob(&hash)
149        .map_err(to_status)?
150        .ok_or_else(|| {
151            Status::not_found(format!(
152                "discussions blob {} referenced by state {} is missing",
153                hash,
154                state.change_id.to_string_full()
155            ))
156        })?;
157    DiscussionsBlob::decode(blob.content())
158        .map_err(|err| Status::internal(format!("failed to decode discussions blob: {err}")))
159}
160
161/// Convenience: load both the state and its decoded `DiscussionsBlob`.
162fn load_discussions_blob(
163    repo: &Repository,
164    state_id: &ChangeId,
165) -> Result<(State, DiscussionsBlob), Status> {
166    let state = repo
167        .store()
168        .get_state(state_id)
169        .map_err(to_status)?
170        .ok_or_else(|| {
171            Status::not_found(format!("state {} not found", state_id.to_string_full()))
172        })?;
173    let blob = decode_blob_for_state(repo, &state)?;
174    Ok((state, blob))
175}
176
177/// Encode `blob`, persist it under a fresh `ContentHash`, then build and
178/// store a new `State` with the updated `discussions` pointer.
179fn save_discussions_blob(
180    repo: &Repository,
181    state: &State,
182    blob: &DiscussionsBlob,
183) -> Result<State, Status> {
184    let bytes = blob
185        .encode()
186        .map_err(|err| Status::internal(format!("failed to encode discussions blob: {err}")))?;
187    let hash = repo
188        .store()
189        .put_blob(&Blob::new(bytes))
190        .map_err(to_status)?;
191    let new_state = state.clone().with_discussions(hash);
192    repo.store().put_state(&new_state).map_err(to_status)?;
193    Ok(new_state)
194}
195
196/// Resolve the active principal using the repository's identity chain
197/// (env/repo/Git config) and fall back to a placeholder only when that lookup
198/// itself fails. We deliberately don't fail here — discussion authorship
199/// should never block on missing config.
200fn principal_for(repo: &Repository) -> Principal {
201    repo.get_principal()
202        .unwrap_or_else(|_| Principal::new("<unknown>", ""))
203}
204
205/// Resolve the HEAD state. Returns `Status::failed_precondition` when the
206/// repository has no HEAD (a fresh repo before any thread is seeded).
207fn head_state(repo: &Repository) -> Result<State, Status> {
208    let head_id = repo
209        .head()
210        .map_err(to_status)?
211        .ok_or_else(|| Status::failed_precondition("repository has no HEAD"))?;
212    repo.store()
213        .get_state(&head_id)
214        .map_err(to_status)?
215        .ok_or_else(|| {
216            Status::not_found(format!("HEAD state {} not found", head_id.to_string_full()))
217        })
218}
219
220/// Status filter for list_by_state / list_by_symbol. Empty / unknown values
221/// behave like `"all"`.
222fn status_matches(d: &Discussion, status: &str) -> bool {
223    match status {
224        "open" => d.is_open(),
225        "resolved" => !d.is_open(),
226        "orphaned" => d.orphaned,
227        // "all", "", anything else.
228        _ => true,
229    }
230}
231
232#[tonic::async_trait]
233impl DiscussionService for LocalDiscussionService {
234    async fn open_discussion(
235        &self,
236        request: Request<OpenDiscussionRequest>,
237    ) -> Result<Response<ProtoDiscussion>, Status> {
238        let req = request.into_inner();
239        let req_bytes = req.encode_to_vec();
240        let client_op_id = req.client_operation_id.clone();
241        let inner = self.inner.clone();
242
243        let result = with_idempotency(
244            &self.inner,
245            &client_op_id,
246            "discussion.open",
247            &req_bytes,
248            move || {
249                let req = req.clone();
250                let inner = inner.clone();
251                async move {
252                    let repo = inner.repo();
253                    let anchor_proto = req
254                        .anchor
255                        .clone()
256                        .ok_or_else(|| Status::invalid_argument("anchor is required"))?;
257                    if anchor_proto.file.is_empty() {
258                        return Err(Status::invalid_argument("anchor.file is required"));
259                    }
260                    if anchor_proto.symbol.is_empty() {
261                        return Err(Status::invalid_argument("anchor.symbol is required"));
262                    }
263                    if req.body.trim().is_empty() {
264                        return Err(Status::invalid_argument("body must be non-empty"));
265                    }
266                    let opened_against =
267                        ChangeId::try_from_slice(&req.state_id).map_err(|err| {
268                            Status::invalid_argument(format!("invalid state_id: {err}"))
269                        })?;
270                    let (state, mut blob) = load_discussions_blob(repo, &opened_against)?;
271                    let now = now_secs();
272                    let principal = principal_for(repo);
273                    let discussion = Discussion {
274                        id: ChangeId::generate().to_string_full(),
275                        anchor: SymbolAnchor::new(anchor_proto.file, anchor_proto.symbol),
276                        opened_against_state: opened_against,
277                        opened_at: now,
278                        thread_ref: (!req.thread_ref.is_empty()).then(|| req.thread_ref.clone()),
279                        turns: vec![DiscussionTurn {
280                            author: principal,
281                            body: req.body.clone(),
282                            posted_at: now,
283                        }],
284                        resolution: DiscussionResolution::Open,
285                        body_changed_since_open: false,
286                        orphaned: false,
287                        visibility: parse_visibility(&req.visibility),
288                        resolved_annotation_id: None,
289                    };
290                    discussion
291                        .validate()
292                        .map_err(|err| Status::invalid_argument(err.to_string()))?;
293                    blob.discussions.push(discussion.clone());
294                    save_discussions_blob(repo, &state, &blob)?;
295                    Ok(discussion_to_proto(&discussion))
296                }
297            },
298        )
299        .await?;
300
301        Ok(Response::new(result))
302    }
303
304    async fn append_turn(
305        &self,
306        request: Request<AppendTurnRequest>,
307    ) -> Result<Response<ProtoDiscussion>, Status> {
308        let req = request.into_inner();
309        let req_bytes = req.encode_to_vec();
310        let client_op_id = req.client_operation_id.clone();
311        let inner = self.inner.clone();
312
313        let result = with_idempotency(
314            &self.inner,
315            &client_op_id,
316            "discussion.append_turn",
317            &req_bytes,
318            move || {
319                let req = req.clone();
320                let inner = inner.clone();
321                async move {
322                    let repo = inner.repo();
323                    if req.discussion_id.is_empty() {
324                        return Err(Status::invalid_argument("discussion_id is required"));
325                    }
326                    if req.body.trim().is_empty() {
327                        return Err(Status::invalid_argument("body must be non-empty"));
328                    }
329                    // TODO(W2-followup): scan all states / oplog instead of HEAD-only.
330                    let head = head_state(repo)?;
331                    let mut blob = decode_blob_for_state(repo, &head)?;
332                    let idx = blob
333                        .discussions
334                        .iter()
335                        .position(|d| d.id == req.discussion_id)
336                        .ok_or_else(|| {
337                            Status::not_found(format!("discussion {} not found", req.discussion_id))
338                        })?;
339                    let principal = principal_for(repo);
340                    blob.discussions[idx].turns.push(DiscussionTurn {
341                        author: principal,
342                        body: req.body.clone(),
343                        posted_at: now_secs(),
344                    });
345                    blob.discussions[idx]
346                        .validate()
347                        .map_err(|err| Status::invalid_argument(err.to_string()))?;
348                    let updated = blob.discussions[idx].clone();
349                    save_discussions_blob(repo, &head, &blob)?;
350                    Ok(discussion_to_proto(&updated))
351                }
352            },
353        )
354        .await?;
355
356        Ok(Response::new(result))
357    }
358
359    async fn resolve_discussion(
360        &self,
361        request: Request<ResolveDiscussionRequest>,
362    ) -> Result<Response<ProtoDiscussion>, Status> {
363        let req = request.into_inner();
364        let req_bytes = req.encode_to_vec();
365        let client_op_id = req.client_operation_id.clone();
366        let inner = self.inner.clone();
367
368        let result = with_idempotency(
369            &self.inner,
370            &client_op_id,
371            "discussion.resolve",
372            &req_bytes,
373            move || {
374                let req = req.clone();
375                let inner = inner.clone();
376                async move {
377                    let repo = inner.repo();
378                    if req.discussion_id.is_empty() {
379                        return Err(Status::invalid_argument("discussion_id is required"));
380                    }
381                    // TODO(W2-followup): scan all states / oplog instead of HEAD-only.
382                    let head = head_state(repo)?;
383                    let mut blob = decode_blob_for_state(repo, &head)?;
384                    let idx = blob
385                        .discussions
386                        .iter()
387                        .position(|d| d.id == req.discussion_id)
388                        .ok_or_else(|| {
389                            Status::not_found(format!("discussion {} not found", req.discussion_id))
390                        })?;
391
392                    use grpc::heddle::v1::resolve_discussion_request::Resolution;
393                    let resolution = req
394                        .resolution
395                        .clone()
396                        .ok_or_else(|| Status::invalid_argument("resolution mode is required"))?;
397                    match resolution {
398                        Resolution::IntoAnnotation(_payload) => {
399                            // TODO(W2-followup): R5 wiring will create a real
400                            // `Annotation` from the discussion's content,
401                            // attribute it, and back-link it. For the first
402                            // ship we mint a placeholder id and record the
403                            // bidirectional link so the resolution shape is
404                            // honest about its terminal state.
405                            let annotation_id = ChangeId::generate().to_string_full();
406                            blob.discussions[idx].resolution =
407                                DiscussionResolution::ResolvedIntoAnnotation {
408                                    annotation_id: annotation_id.clone(),
409                                };
410                            blob.discussions[idx].resolved_annotation_id = Some(annotation_id);
411                        }
412                        Resolution::ByEdit(payload) => {
413                            let state_id =
414                                ChangeId::try_from_slice(&payload.state_id).map_err(|err| {
415                                    Status::invalid_argument(format!("invalid state_id: {err}"))
416                                })?;
417                            blob.discussions[idx].resolution =
418                                DiscussionResolution::ResolvedByEdit { state_id };
419                        }
420                        Resolution::Dismissed(payload) => {
421                            if payload.reason.trim().is_empty() {
422                                return Err(Status::invalid_argument(
423                                    "dismissal requires a non-empty reason",
424                                ));
425                            }
426                            blob.discussions[idx].resolution = DiscussionResolution::Dismissed {
427                                reason: payload.reason,
428                            };
429                        }
430                    }
431
432                    blob.discussions[idx]
433                        .validate()
434                        .map_err(|err| Status::invalid_argument(err.to_string()))?;
435                    let updated = blob.discussions[idx].clone();
436                    save_discussions_blob(repo, &head, &blob)?;
437                    Ok(discussion_to_proto(&updated))
438                }
439            },
440        )
441        .await?;
442
443        Ok(Response::new(result))
444    }
445
446    async fn list_by_state(
447        &self,
448        request: Request<ListDiscussionsByStateRequest>,
449    ) -> Result<Response<ListDiscussionsResponse>, Status> {
450        let req = request.into_inner();
451        let repo = self.inner.repo();
452        let (_, state) = load_state(repo, &req.state_id)?;
453        let blob = decode_blob_for_state(repo, &state)?;
454        let discussions = blob
455            .discussions
456            .iter()
457            .filter(|d| status_matches(d, &req.status))
458            .map(discussion_to_proto)
459            .collect();
460        Ok(Response::new(ListDiscussionsResponse { discussions }))
461    }
462
463    async fn list_by_symbol(
464        &self,
465        request: Request<ListDiscussionsBySymbolRequest>,
466    ) -> Result<Response<ListDiscussionsResponse>, Status> {
467        let req = request.into_inner();
468        let anchor = req
469            .anchor
470            .ok_or_else(|| Status::invalid_argument("anchor is required"))?;
471        if anchor.file.is_empty() || anchor.symbol.is_empty() {
472            return Err(Status::invalid_argument(
473                "anchor.file and anchor.symbol are required",
474            ));
475        }
476        // TODO(W2-followup): cross-state symbol index. For now we only
477        // surface discussions attached to the HEAD state.
478        let repo = self.inner.repo();
479        let head = head_state(repo)?;
480        let blob = decode_blob_for_state(repo, &head)?;
481        let discussions = blob
482            .discussions
483            .iter()
484            .filter(|d| d.anchor.file == anchor.file && d.anchor.symbol == anchor.symbol)
485            .filter(|d| status_matches(d, &req.status))
486            .map(discussion_to_proto)
487            .collect();
488        Ok(Response::new(ListDiscussionsResponse { discussions }))
489    }
490
491    async fn get_discussion(
492        &self,
493        request: Request<GetDiscussionRequest>,
494    ) -> Result<Response<ProtoDiscussion>, Status> {
495        let req = request.into_inner();
496        if req.discussion_id.is_empty() {
497            return Err(Status::invalid_argument("discussion_id is required"));
498        }
499        // TODO(W2-followup): scan all states / oplog instead of HEAD-only.
500        let repo = self.inner.repo();
501        let head = head_state(repo)?;
502        let blob = decode_blob_for_state(repo, &head)?;
503        let discussion = blob
504            .discussions
505            .iter()
506            .find(|d| d.id == req.discussion_id)
507            .ok_or_else(|| {
508                Status::not_found(format!("discussion {} not found", req.discussion_id))
509            })?;
510        Ok(Response::new(discussion_to_proto(discussion)))
511    }
512}
513
514#[cfg(test)]
515mod tests {
516    use std::sync::Arc;
517
518    use objects::object::{Attribution, Principal};
519    use repo::{Repository, operation_dedup::OperationDedupStore};
520    use tempfile::TempDir;
521
522    use super::*;
523
524    fn fresh_service() -> (TempDir, ChangeId, LocalDiscussionService) {
525        let temp = TempDir::new().unwrap();
526        let repo = Repository::init_default(temp.path()).unwrap();
527        // Take a snapshot so we have a real state to anchor discussions against.
528        let attribution = Attribution::human(Principal::new("Tester", "tester@example.com"));
529        let state = repo
530            .snapshot_with_attribution(Some("seed".into()), None, attribution)
531            .unwrap();
532        let dedup = OperationDedupStore::open(repo.heddle_dir()).unwrap();
533        let inner = GrpcLocalService::new(Arc::new(repo), Arc::new(dedup));
534        let svc = LocalDiscussionService::new(inner);
535        (temp, state.change_id, svc)
536    }
537
538    fn open_request(state_id: &ChangeId, body: &str, op_id: &str) -> OpenDiscussionRequest {
539        OpenDiscussionRequest {
540            repo_path: String::new(),
541            state_id: state_id.as_bytes().to_vec(),
542            anchor: Some(PathSymbolRef {
543                file: "src/lib.rs".into(),
544                symbol: "foo".into(),
545            }),
546            body: body.into(),
547            visibility: String::new(),
548            thread_ref: String::new(),
549            client_operation_id: op_id.into(),
550        }
551    }
552
553    #[tokio::test]
554    #[serial_test::serial(process_global)]
555    async fn open_then_append_turn_persists_both_turns() {
556        let (_t, state_id, svc) = fresh_service();
557        let opened = svc
558            .open_discussion(Request::new(open_request(&state_id, "first", "")))
559            .await
560            .unwrap()
561            .into_inner();
562        assert_eq!(opened.turns.len(), 1);
563        assert_eq!(opened.turns[0].body, "first");
564
565        let appended = svc
566            .append_turn(Request::new(AppendTurnRequest {
567                repo_path: String::new(),
568                discussion_id: opened.id.clone(),
569                body: "second".into(),
570                client_operation_id: String::new(),
571            }))
572            .await
573            .unwrap()
574            .into_inner();
575        assert_eq!(appended.turns.len(), 2);
576        assert_eq!(appended.turns[0].body, "first");
577        assert_eq!(appended.turns[1].body, "second");
578
579        // Confirm the on-disk state actually carries both turns: re-list.
580        let listed = svc
581            .list_by_state(Request::new(ListDiscussionsByStateRequest {
582                repo_path: String::new(),
583                state_id: state_id.as_bytes().to_vec(),
584                status: "all".into(),
585            }))
586            .await
587            .unwrap()
588            .into_inner();
589        // The discussion was attached to the original state so list_by_state
590        // on that state still finds it.
591        assert_eq!(listed.discussions.len(), 1);
592        assert_eq!(listed.discussions[0].turns.len(), 2);
593    }
594
595    #[tokio::test]
596    #[serial_test::serial(process_global)]
597    async fn open_idempotent_returns_same_discussion() {
598        let (_t, state_id, svc) = fresh_service();
599        let op_id = "11111111-2222-3333-4444-555555555555";
600        let first = svc
601            .open_discussion(Request::new(open_request(&state_id, "hello", op_id)))
602            .await
603            .unwrap()
604            .into_inner();
605        let second = svc
606            .open_discussion(Request::new(open_request(&state_id, "hello", op_id)))
607            .await
608            .unwrap()
609            .into_inner();
610        assert_eq!(first.id, second.id);
611        assert_eq!(first.turns.len(), 1);
612        assert_eq!(second.turns.len(), 1);
613    }
614
615    #[tokio::test]
616    #[serial_test::serial(process_global)]
617    async fn resolve_dismissed_with_empty_reason_is_invalid_argument() {
618        let (_t, state_id, svc) = fresh_service();
619        let opened = svc
620            .open_discussion(Request::new(open_request(&state_id, "why?", "")))
621            .await
622            .unwrap()
623            .into_inner();
624
625        use grpc::heddle::v1::resolve_discussion_request::{Resolution, ResolveDismissed};
626        let err = svc
627            .resolve_discussion(Request::new(ResolveDiscussionRequest {
628                repo_path: String::new(),
629                discussion_id: opened.id,
630                resolution: Some(Resolution::Dismissed(ResolveDismissed {
631                    reason: "   ".into(),
632                })),
633                client_operation_id: String::new(),
634            }))
635            .await
636            .unwrap_err();
637        assert_eq!(err.code(), tonic::Code::InvalidArgument);
638    }
639
640    #[tokio::test]
641    #[serial_test::serial(process_global)]
642    async fn list_by_state_filters_by_status() {
643        let (_t, state_id, svc) = fresh_service();
644        // Open two discussions, dismiss one of them.
645        let a = svc
646            .open_discussion(Request::new(open_request(&state_id, "a", "")))
647            .await
648            .unwrap()
649            .into_inner();
650        let _b = svc
651            .open_discussion(Request::new(open_request(&state_id, "b", "")))
652            .await
653            .unwrap()
654            .into_inner();
655
656        use grpc::heddle::v1::resolve_discussion_request::{Resolution, ResolveDismissed};
657        svc.resolve_discussion(Request::new(ResolveDiscussionRequest {
658            repo_path: String::new(),
659            discussion_id: a.id.clone(),
660            resolution: Some(Resolution::Dismissed(ResolveDismissed {
661                reason: "no longer relevant".into(),
662            })),
663            client_operation_id: String::new(),
664        }))
665        .await
666        .unwrap();
667
668        // The dismissal mutates the HEAD state's blob, not the original
669        // state's blob. So `list_by_state(state_id, "open")` should still
670        // see two open discussions on the *original* state_id (since the
671        // resolve wrote to HEAD, which advanced past state_id only when a
672        // new snapshot was taken — in our test repo HEAD is still
673        // state_id from `seed`).
674        //
675        // To make a deterministic assertion regardless of HEAD movement
676        // we instead query the HEAD state, which is where resolve_*
677        // wrote its mutation. We rely on `repo.head()` matching
678        // `state_id` because we never took an additional snapshot.
679        let head_state_id = state_id.as_bytes().to_vec();
680        let open_only = svc
681            .list_by_state(Request::new(ListDiscussionsByStateRequest {
682                repo_path: String::new(),
683                state_id: head_state_id.clone(),
684                status: "open".into(),
685            }))
686            .await
687            .unwrap()
688            .into_inner();
689        assert_eq!(open_only.discussions.len(), 1);
690        assert_eq!(open_only.discussions[0].turns[0].body, "b");
691
692        let resolved_only = svc
693            .list_by_state(Request::new(ListDiscussionsByStateRequest {
694                repo_path: String::new(),
695                state_id: head_state_id.clone(),
696                status: "resolved".into(),
697            }))
698            .await
699            .unwrap()
700            .into_inner();
701        assert_eq!(resolved_only.discussions.len(), 1);
702        assert_eq!(resolved_only.discussions[0].turns[0].body, "a");
703
704        let all = svc
705            .list_by_state(Request::new(ListDiscussionsByStateRequest {
706                repo_path: String::new(),
707                state_id: head_state_id,
708                status: "all".into(),
709            }))
710            .await
711            .unwrap()
712            .into_inner();
713        assert_eq!(all.discussions.len(), 2);
714    }
715}