1use grpc::heddle::v1::{
15 AppendTurnRequest, Discussion as ProtoDiscussion,
16 DiscussionResolution as ProtoDiscussionResolution, DiscussionTurn as ProtoDiscussionTurn,
17 GetDiscussionRequest, ListDiscussionsByStateRequest, ListDiscussionsBySymbolRequest,
18 ListDiscussionsResponse, OpenDiscussionRequest, PathSymbolRef, ResolveDiscussionRequest,
19 discussion_service_server::DiscussionService,
20};
21use objects::{
22 object::{
23 Blob, ChangeId, Discussion, DiscussionResolution, DiscussionTurn, DiscussionsBlob,
24 Principal, State, SymbolAnchor, VisibilityTier,
25 },
26 store::ObjectStore,
27};
28use prost::Message;
29use repo::Repository;
30use tonic::{Request, Response, Status};
31
32use super::{GrpcLocalService, to_status, with_idempotency};
33
34#[derive(Clone)]
35pub struct LocalDiscussionService {
36 inner: GrpcLocalService,
37}
38
39impl LocalDiscussionService {
40 pub fn new(inner: GrpcLocalService) -> Self {
41 Self { inner }
42 }
43}
44
45fn now_secs() -> i64 {
46 std::time::SystemTime::now()
47 .duration_since(std::time::UNIX_EPOCH)
48 .map(|d| d.as_secs() as i64)
49 .unwrap_or(0)
50}
51
52fn parse_visibility(s: &str) -> VisibilityTier {
59 match s {
60 "internal" => VisibilityTier::Internal,
61 "team_scoped" => VisibilityTier::TeamScoped {
62 team_id: String::new(),
63 },
64 "restricted" => VisibilityTier::Restricted {
65 scope_label: String::new(),
66 },
67 _ => VisibilityTier::Public,
69 }
70}
71
72fn turn_to_proto(turn: &DiscussionTurn) -> ProtoDiscussionTurn {
73 ProtoDiscussionTurn {
74 author_name: turn.author.name.clone(),
75 author_email: turn.author.email.clone(),
76 body: turn.body.clone(),
77 posted_at: Some(prost_types::Timestamp {
78 seconds: turn.posted_at,
79 nanos: 0,
80 }),
81 }
82}
83
84fn resolution_to_proto(resolution: &DiscussionResolution) -> ProtoDiscussionResolution {
85 use grpc::heddle::v1::discussion_resolution::{
86 Dismissed, Open, ResolvedByEdit, ResolvedIntoAnnotation, State,
87 };
88 let state = match resolution {
89 DiscussionResolution::Open => State::Open(Open {}),
90 DiscussionResolution::ResolvedIntoAnnotation { annotation_id } => {
91 State::IntoAnnotation(ResolvedIntoAnnotation {
92 annotation_id: annotation_id.clone(),
93 })
94 }
95 DiscussionResolution::ResolvedByEdit { state_id } => State::ByEdit(ResolvedByEdit {
96 state_id: state_id.as_bytes().to_vec(),
97 }),
98 DiscussionResolution::Dismissed { reason } => State::Dismissed(Dismissed {
99 reason: reason.clone(),
100 }),
101 };
102 ProtoDiscussionResolution { state: Some(state) }
103}
104
105fn discussion_to_proto(d: &Discussion) -> ProtoDiscussion {
106 ProtoDiscussion {
107 id: d.id.clone(),
108 anchor: Some(PathSymbolRef {
109 file: d.anchor.file.clone(),
110 symbol: d.anchor.symbol.clone(),
111 }),
112 opened_against_state: d.opened_against_state.as_bytes().to_vec(),
113 opened_at: Some(prost_types::Timestamp {
114 seconds: d.opened_at,
115 nanos: 0,
116 }),
117 thread_ref: d.thread_ref.clone().unwrap_or_default(),
118 turns: d.turns.iter().map(turn_to_proto).collect(),
119 resolution: Some(resolution_to_proto(&d.resolution)),
120 body_changed_since_open: d.body_changed_since_open,
121 orphaned: d.orphaned,
122 visibility: d.visibility.as_str().to_string(),
123 resolved_annotation_id: d.resolved_annotation_id.clone().unwrap_or_default(),
124 }
125}
126
127fn load_state(repo: &Repository, state_id: &[u8]) -> Result<(ChangeId, State), Status> {
130 let id = ChangeId::try_from_slice(state_id)
131 .map_err(|err| Status::invalid_argument(format!("invalid state_id: {err}")))?;
132 let state = repo
133 .store()
134 .get_state(&id)
135 .map_err(to_status)?
136 .ok_or_else(|| Status::not_found(format!("state {} not found", id.to_string_full())))?;
137 Ok((id, state))
138}
139
140fn decode_blob_for_state(repo: &Repository, state: &State) -> Result<DiscussionsBlob, Status> {
143 let Some(hash) = state.discussions else {
144 return Ok(DiscussionsBlob::new(Vec::new()));
145 };
146 let blob = repo
147 .store()
148 .get_blob(&hash)
149 .map_err(to_status)?
150 .ok_or_else(|| {
151 Status::not_found(format!(
152 "discussions blob {} referenced by state {} is missing",
153 hash,
154 state.change_id.to_string_full()
155 ))
156 })?;
157 DiscussionsBlob::decode(blob.content())
158 .map_err(|err| Status::internal(format!("failed to decode discussions blob: {err}")))
159}
160
161fn load_discussions_blob(
163 repo: &Repository,
164 state_id: &ChangeId,
165) -> Result<(State, DiscussionsBlob), Status> {
166 let state = repo
167 .store()
168 .get_state(state_id)
169 .map_err(to_status)?
170 .ok_or_else(|| {
171 Status::not_found(format!("state {} not found", state_id.to_string_full()))
172 })?;
173 let blob = decode_blob_for_state(repo, &state)?;
174 Ok((state, blob))
175}
176
177fn save_discussions_blob(
180 repo: &Repository,
181 state: &State,
182 blob: &DiscussionsBlob,
183) -> Result<State, Status> {
184 let bytes = blob
185 .encode()
186 .map_err(|err| Status::internal(format!("failed to encode discussions blob: {err}")))?;
187 let hash = repo
188 .store()
189 .put_blob(&Blob::new(bytes))
190 .map_err(to_status)?;
191 let new_state = state.clone().with_discussions(hash);
192 repo.store().put_state(&new_state).map_err(to_status)?;
193 Ok(new_state)
194}
195
196fn principal_for(repo: &Repository) -> Principal {
201 repo.get_principal()
202 .unwrap_or_else(|_| Principal::new("<unknown>", ""))
203}
204
205fn head_state(repo: &Repository) -> Result<State, Status> {
208 let head_id = repo
209 .head()
210 .map_err(to_status)?
211 .ok_or_else(|| Status::failed_precondition("repository has no HEAD"))?;
212 repo.store()
213 .get_state(&head_id)
214 .map_err(to_status)?
215 .ok_or_else(|| {
216 Status::not_found(format!("HEAD state {} not found", head_id.to_string_full()))
217 })
218}
219
220fn status_matches(d: &Discussion, status: &str) -> bool {
223 match status {
224 "open" => d.is_open(),
225 "resolved" => !d.is_open(),
226 "orphaned" => d.orphaned,
227 _ => true,
229 }
230}
231
232#[tonic::async_trait]
233impl DiscussionService for LocalDiscussionService {
234 async fn open_discussion(
235 &self,
236 request: Request<OpenDiscussionRequest>,
237 ) -> Result<Response<ProtoDiscussion>, Status> {
238 let req = request.into_inner();
239 let req_bytes = req.encode_to_vec();
240 let client_op_id = req.client_operation_id.clone();
241 let inner = self.inner.clone();
242
243 let result = with_idempotency(
244 &self.inner,
245 &client_op_id,
246 "discussion.open",
247 &req_bytes,
248 move || {
249 let req = req.clone();
250 let inner = inner.clone();
251 async move {
252 let repo = inner.repo();
253 let anchor_proto = req
254 .anchor
255 .clone()
256 .ok_or_else(|| Status::invalid_argument("anchor is required"))?;
257 if anchor_proto.file.is_empty() {
258 return Err(Status::invalid_argument("anchor.file is required"));
259 }
260 if anchor_proto.symbol.is_empty() {
261 return Err(Status::invalid_argument("anchor.symbol is required"));
262 }
263 if req.body.trim().is_empty() {
264 return Err(Status::invalid_argument("body must be non-empty"));
265 }
266 let opened_against =
267 ChangeId::try_from_slice(&req.state_id).map_err(|err| {
268 Status::invalid_argument(format!("invalid state_id: {err}"))
269 })?;
270 let (state, mut blob) = load_discussions_blob(repo, &opened_against)?;
271 let now = now_secs();
272 let principal = principal_for(repo);
273 let discussion = Discussion {
274 id: ChangeId::generate().to_string_full(),
275 anchor: SymbolAnchor::new(anchor_proto.file, anchor_proto.symbol),
276 opened_against_state: opened_against,
277 opened_at: now,
278 thread_ref: (!req.thread_ref.is_empty()).then(|| req.thread_ref.clone()),
279 turns: vec![DiscussionTurn {
280 author: principal,
281 body: req.body.clone(),
282 posted_at: now,
283 }],
284 resolution: DiscussionResolution::Open,
285 body_changed_since_open: false,
286 orphaned: false,
287 visibility: parse_visibility(&req.visibility),
288 resolved_annotation_id: None,
289 };
290 discussion
291 .validate()
292 .map_err(|err| Status::invalid_argument(err.to_string()))?;
293 blob.discussions.push(discussion.clone());
294 save_discussions_blob(repo, &state, &blob)?;
295 Ok(discussion_to_proto(&discussion))
296 }
297 },
298 )
299 .await?;
300
301 Ok(Response::new(result))
302 }
303
304 async fn append_turn(
305 &self,
306 request: Request<AppendTurnRequest>,
307 ) -> Result<Response<ProtoDiscussion>, Status> {
308 let req = request.into_inner();
309 let req_bytes = req.encode_to_vec();
310 let client_op_id = req.client_operation_id.clone();
311 let inner = self.inner.clone();
312
313 let result = with_idempotency(
314 &self.inner,
315 &client_op_id,
316 "discussion.append_turn",
317 &req_bytes,
318 move || {
319 let req = req.clone();
320 let inner = inner.clone();
321 async move {
322 let repo = inner.repo();
323 if req.discussion_id.is_empty() {
324 return Err(Status::invalid_argument("discussion_id is required"));
325 }
326 if req.body.trim().is_empty() {
327 return Err(Status::invalid_argument("body must be non-empty"));
328 }
329 let head = head_state(repo)?;
331 let mut blob = decode_blob_for_state(repo, &head)?;
332 let idx = blob
333 .discussions
334 .iter()
335 .position(|d| d.id == req.discussion_id)
336 .ok_or_else(|| {
337 Status::not_found(format!("discussion {} not found", req.discussion_id))
338 })?;
339 let principal = principal_for(repo);
340 blob.discussions[idx].turns.push(DiscussionTurn {
341 author: principal,
342 body: req.body.clone(),
343 posted_at: now_secs(),
344 });
345 blob.discussions[idx]
346 .validate()
347 .map_err(|err| Status::invalid_argument(err.to_string()))?;
348 let updated = blob.discussions[idx].clone();
349 save_discussions_blob(repo, &head, &blob)?;
350 Ok(discussion_to_proto(&updated))
351 }
352 },
353 )
354 .await?;
355
356 Ok(Response::new(result))
357 }
358
359 async fn resolve_discussion(
360 &self,
361 request: Request<ResolveDiscussionRequest>,
362 ) -> Result<Response<ProtoDiscussion>, Status> {
363 let req = request.into_inner();
364 let req_bytes = req.encode_to_vec();
365 let client_op_id = req.client_operation_id.clone();
366 let inner = self.inner.clone();
367
368 let result = with_idempotency(
369 &self.inner,
370 &client_op_id,
371 "discussion.resolve",
372 &req_bytes,
373 move || {
374 let req = req.clone();
375 let inner = inner.clone();
376 async move {
377 let repo = inner.repo();
378 if req.discussion_id.is_empty() {
379 return Err(Status::invalid_argument("discussion_id is required"));
380 }
381 let head = head_state(repo)?;
383 let mut blob = decode_blob_for_state(repo, &head)?;
384 let idx = blob
385 .discussions
386 .iter()
387 .position(|d| d.id == req.discussion_id)
388 .ok_or_else(|| {
389 Status::not_found(format!("discussion {} not found", req.discussion_id))
390 })?;
391
392 use grpc::heddle::v1::resolve_discussion_request::Resolution;
393 let resolution = req
394 .resolution
395 .clone()
396 .ok_or_else(|| Status::invalid_argument("resolution mode is required"))?;
397 match resolution {
398 Resolution::IntoAnnotation(_payload) => {
399 let annotation_id = ChangeId::generate().to_string_full();
406 blob.discussions[idx].resolution =
407 DiscussionResolution::ResolvedIntoAnnotation {
408 annotation_id: annotation_id.clone(),
409 };
410 blob.discussions[idx].resolved_annotation_id = Some(annotation_id);
411 }
412 Resolution::ByEdit(payload) => {
413 let state_id =
414 ChangeId::try_from_slice(&payload.state_id).map_err(|err| {
415 Status::invalid_argument(format!("invalid state_id: {err}"))
416 })?;
417 blob.discussions[idx].resolution =
418 DiscussionResolution::ResolvedByEdit { state_id };
419 }
420 Resolution::Dismissed(payload) => {
421 if payload.reason.trim().is_empty() {
422 return Err(Status::invalid_argument(
423 "dismissal requires a non-empty reason",
424 ));
425 }
426 blob.discussions[idx].resolution = DiscussionResolution::Dismissed {
427 reason: payload.reason,
428 };
429 }
430 }
431
432 blob.discussions[idx]
433 .validate()
434 .map_err(|err| Status::invalid_argument(err.to_string()))?;
435 let updated = blob.discussions[idx].clone();
436 save_discussions_blob(repo, &head, &blob)?;
437 Ok(discussion_to_proto(&updated))
438 }
439 },
440 )
441 .await?;
442
443 Ok(Response::new(result))
444 }
445
446 async fn list_by_state(
447 &self,
448 request: Request<ListDiscussionsByStateRequest>,
449 ) -> Result<Response<ListDiscussionsResponse>, Status> {
450 let req = request.into_inner();
451 let repo = self.inner.repo();
452 let (_, state) = load_state(repo, &req.state_id)?;
453 let blob = decode_blob_for_state(repo, &state)?;
454 let discussions = blob
455 .discussions
456 .iter()
457 .filter(|d| status_matches(d, &req.status))
458 .map(discussion_to_proto)
459 .collect();
460 Ok(Response::new(ListDiscussionsResponse { discussions }))
461 }
462
463 async fn list_by_symbol(
464 &self,
465 request: Request<ListDiscussionsBySymbolRequest>,
466 ) -> Result<Response<ListDiscussionsResponse>, Status> {
467 let req = request.into_inner();
468 let anchor = req
469 .anchor
470 .ok_or_else(|| Status::invalid_argument("anchor is required"))?;
471 if anchor.file.is_empty() || anchor.symbol.is_empty() {
472 return Err(Status::invalid_argument(
473 "anchor.file and anchor.symbol are required",
474 ));
475 }
476 let repo = self.inner.repo();
479 let head = head_state(repo)?;
480 let blob = decode_blob_for_state(repo, &head)?;
481 let discussions = blob
482 .discussions
483 .iter()
484 .filter(|d| d.anchor.file == anchor.file && d.anchor.symbol == anchor.symbol)
485 .filter(|d| status_matches(d, &req.status))
486 .map(discussion_to_proto)
487 .collect();
488 Ok(Response::new(ListDiscussionsResponse { discussions }))
489 }
490
491 async fn get_discussion(
492 &self,
493 request: Request<GetDiscussionRequest>,
494 ) -> Result<Response<ProtoDiscussion>, Status> {
495 let req = request.into_inner();
496 if req.discussion_id.is_empty() {
497 return Err(Status::invalid_argument("discussion_id is required"));
498 }
499 let repo = self.inner.repo();
501 let head = head_state(repo)?;
502 let blob = decode_blob_for_state(repo, &head)?;
503 let discussion = blob
504 .discussions
505 .iter()
506 .find(|d| d.id == req.discussion_id)
507 .ok_or_else(|| {
508 Status::not_found(format!("discussion {} not found", req.discussion_id))
509 })?;
510 Ok(Response::new(discussion_to_proto(discussion)))
511 }
512}
513
514#[cfg(test)]
515mod tests {
516 use std::sync::Arc;
517
518 use objects::object::{Attribution, Principal};
519 use repo::{Repository, operation_dedup::OperationDedupStore};
520 use tempfile::TempDir;
521
522 use super::*;
523
524 fn fresh_service() -> (TempDir, ChangeId, LocalDiscussionService) {
525 let temp = TempDir::new().unwrap();
526 let repo = Repository::init_default(temp.path()).unwrap();
527 let attribution = Attribution::human(Principal::new("Tester", "tester@example.com"));
529 let state = repo
530 .snapshot_with_attribution(Some("seed".into()), None, attribution)
531 .unwrap();
532 let dedup = OperationDedupStore::open(repo.heddle_dir()).unwrap();
533 let inner = GrpcLocalService::new(Arc::new(repo), Arc::new(dedup));
534 let svc = LocalDiscussionService::new(inner);
535 (temp, state.change_id, svc)
536 }
537
538 fn open_request(state_id: &ChangeId, body: &str, op_id: &str) -> OpenDiscussionRequest {
539 OpenDiscussionRequest {
540 repo_path: String::new(),
541 state_id: state_id.as_bytes().to_vec(),
542 anchor: Some(PathSymbolRef {
543 file: "src/lib.rs".into(),
544 symbol: "foo".into(),
545 }),
546 body: body.into(),
547 visibility: String::new(),
548 thread_ref: String::new(),
549 client_operation_id: op_id.into(),
550 }
551 }
552
553 #[tokio::test]
554 #[serial_test::serial(process_global)]
555 async fn open_then_append_turn_persists_both_turns() {
556 let (_t, state_id, svc) = fresh_service();
557 let opened = svc
558 .open_discussion(Request::new(open_request(&state_id, "first", "")))
559 .await
560 .unwrap()
561 .into_inner();
562 assert_eq!(opened.turns.len(), 1);
563 assert_eq!(opened.turns[0].body, "first");
564
565 let appended = svc
566 .append_turn(Request::new(AppendTurnRequest {
567 repo_path: String::new(),
568 discussion_id: opened.id.clone(),
569 body: "second".into(),
570 client_operation_id: String::new(),
571 }))
572 .await
573 .unwrap()
574 .into_inner();
575 assert_eq!(appended.turns.len(), 2);
576 assert_eq!(appended.turns[0].body, "first");
577 assert_eq!(appended.turns[1].body, "second");
578
579 let listed = svc
581 .list_by_state(Request::new(ListDiscussionsByStateRequest {
582 repo_path: String::new(),
583 state_id: state_id.as_bytes().to_vec(),
584 status: "all".into(),
585 }))
586 .await
587 .unwrap()
588 .into_inner();
589 assert_eq!(listed.discussions.len(), 1);
592 assert_eq!(listed.discussions[0].turns.len(), 2);
593 }
594
595 #[tokio::test]
596 #[serial_test::serial(process_global)]
597 async fn open_idempotent_returns_same_discussion() {
598 let (_t, state_id, svc) = fresh_service();
599 let op_id = "11111111-2222-3333-4444-555555555555";
600 let first = svc
601 .open_discussion(Request::new(open_request(&state_id, "hello", op_id)))
602 .await
603 .unwrap()
604 .into_inner();
605 let second = svc
606 .open_discussion(Request::new(open_request(&state_id, "hello", op_id)))
607 .await
608 .unwrap()
609 .into_inner();
610 assert_eq!(first.id, second.id);
611 assert_eq!(first.turns.len(), 1);
612 assert_eq!(second.turns.len(), 1);
613 }
614
615 #[tokio::test]
616 #[serial_test::serial(process_global)]
617 async fn resolve_dismissed_with_empty_reason_is_invalid_argument() {
618 let (_t, state_id, svc) = fresh_service();
619 let opened = svc
620 .open_discussion(Request::new(open_request(&state_id, "why?", "")))
621 .await
622 .unwrap()
623 .into_inner();
624
625 use grpc::heddle::v1::resolve_discussion_request::{Resolution, ResolveDismissed};
626 let err = svc
627 .resolve_discussion(Request::new(ResolveDiscussionRequest {
628 repo_path: String::new(),
629 discussion_id: opened.id,
630 resolution: Some(Resolution::Dismissed(ResolveDismissed {
631 reason: " ".into(),
632 })),
633 client_operation_id: String::new(),
634 }))
635 .await
636 .unwrap_err();
637 assert_eq!(err.code(), tonic::Code::InvalidArgument);
638 }
639
640 #[tokio::test]
641 #[serial_test::serial(process_global)]
642 async fn list_by_state_filters_by_status() {
643 let (_t, state_id, svc) = fresh_service();
644 let a = svc
646 .open_discussion(Request::new(open_request(&state_id, "a", "")))
647 .await
648 .unwrap()
649 .into_inner();
650 let _b = svc
651 .open_discussion(Request::new(open_request(&state_id, "b", "")))
652 .await
653 .unwrap()
654 .into_inner();
655
656 use grpc::heddle::v1::resolve_discussion_request::{Resolution, ResolveDismissed};
657 svc.resolve_discussion(Request::new(ResolveDiscussionRequest {
658 repo_path: String::new(),
659 discussion_id: a.id.clone(),
660 resolution: Some(Resolution::Dismissed(ResolveDismissed {
661 reason: "no longer relevant".into(),
662 })),
663 client_operation_id: String::new(),
664 }))
665 .await
666 .unwrap();
667
668 let head_state_id = state_id.as_bytes().to_vec();
680 let open_only = svc
681 .list_by_state(Request::new(ListDiscussionsByStateRequest {
682 repo_path: String::new(),
683 state_id: head_state_id.clone(),
684 status: "open".into(),
685 }))
686 .await
687 .unwrap()
688 .into_inner();
689 assert_eq!(open_only.discussions.len(), 1);
690 assert_eq!(open_only.discussions[0].turns[0].body, "b");
691
692 let resolved_only = svc
693 .list_by_state(Request::new(ListDiscussionsByStateRequest {
694 repo_path: String::new(),
695 state_id: head_state_id.clone(),
696 status: "resolved".into(),
697 }))
698 .await
699 .unwrap()
700 .into_inner();
701 assert_eq!(resolved_only.discussions.len(), 1);
702 assert_eq!(resolved_only.discussions[0].turns[0].body, "a");
703
704 let all = svc
705 .list_by_state(Request::new(ListDiscussionsByStateRequest {
706 repo_path: String::new(),
707 state_id: head_state_id,
708 status: "all".into(),
709 }))
710 .await
711 .unwrap()
712 .into_inner();
713 assert_eq!(all.discussions.len(), 2);
714 }
715}