1use grpc::heddle::v1::{
15 AppendTurnRequest, Discussion as ProtoDiscussion,
16 DiscussionResolution as ProtoDiscussionResolution, DiscussionTurn as ProtoDiscussionTurn,
17 GetDiscussionRequest, ListDiscussionsByStateRequest, ListDiscussionsBySymbolRequest,
18 ListDiscussionsResponse, OpenDiscussionRequest, PathSymbolRef, ResolveDiscussionRequest,
19 discussion_service_server::DiscussionService,
20};
21use objects::object::{
22 AnnotationVisibility, Blob, ChangeId, Discussion, DiscussionResolution, DiscussionTurn,
23 DiscussionsBlob, Principal, State, SymbolAnchor,
24};
25use prost::Message;
26use repo::Repository;
27use tonic::{Request, Response, Status};
28
29use super::{GrpcLocalService, to_status, with_idempotency};
30
31#[derive(Clone)]
32pub struct LocalDiscussionService {
33 inner: GrpcLocalService,
34}
35
36impl LocalDiscussionService {
37 pub fn new(inner: GrpcLocalService) -> Self {
38 Self { inner }
39 }
40}
41
42fn now_secs() -> i64 {
43 std::time::SystemTime::now()
44 .duration_since(std::time::UNIX_EPOCH)
45 .map(|d| d.as_secs() as i64)
46 .unwrap_or(0)
47}
48
49fn parse_visibility(s: &str) -> AnnotationVisibility {
56 match s {
57 "internal" => AnnotationVisibility::Internal,
58 "team_scoped" => AnnotationVisibility::TeamScoped {
59 team_id: String::new(),
60 },
61 "restricted" => AnnotationVisibility::Restricted {
62 scope_label: String::new(),
63 },
64 _ => AnnotationVisibility::Public,
66 }
67}
68
69fn turn_to_proto(turn: &DiscussionTurn) -> ProtoDiscussionTurn {
70 ProtoDiscussionTurn {
71 author_name: turn.author.name.clone(),
72 author_email: turn.author.email.clone(),
73 body: turn.body.clone(),
74 posted_at: Some(prost_types::Timestamp {
75 seconds: turn.posted_at,
76 nanos: 0,
77 }),
78 }
79}
80
81fn resolution_to_proto(resolution: &DiscussionResolution) -> ProtoDiscussionResolution {
82 use grpc::heddle::v1::discussion_resolution::{
83 Dismissed, Open, ResolvedByEdit, ResolvedIntoAnnotation, State,
84 };
85 let state = match resolution {
86 DiscussionResolution::Open => State::Open(Open {}),
87 DiscussionResolution::ResolvedIntoAnnotation { annotation_id } => {
88 State::IntoAnnotation(ResolvedIntoAnnotation {
89 annotation_id: annotation_id.clone(),
90 })
91 }
92 DiscussionResolution::ResolvedByEdit { state_id } => State::ByEdit(ResolvedByEdit {
93 state_id: state_id.as_bytes().to_vec(),
94 }),
95 DiscussionResolution::Dismissed { reason } => State::Dismissed(Dismissed {
96 reason: reason.clone(),
97 }),
98 };
99 ProtoDiscussionResolution { state: Some(state) }
100}
101
102fn discussion_to_proto(d: &Discussion) -> ProtoDiscussion {
103 ProtoDiscussion {
104 id: d.id.clone(),
105 anchor: Some(PathSymbolRef {
106 file: d.anchor.file.clone(),
107 symbol: d.anchor.symbol.clone(),
108 }),
109 opened_against_state: d.opened_against_state.as_bytes().to_vec(),
110 opened_at: Some(prost_types::Timestamp {
111 seconds: d.opened_at,
112 nanos: 0,
113 }),
114 thread_ref: d.thread_ref.clone().unwrap_or_default(),
115 turns: d.turns.iter().map(turn_to_proto).collect(),
116 resolution: Some(resolution_to_proto(&d.resolution)),
117 body_changed_since_open: d.body_changed_since_open,
118 orphaned: d.orphaned,
119 visibility: d.visibility.as_str().to_string(),
120 resolved_annotation_id: d.resolved_annotation_id.clone().unwrap_or_default(),
121 }
122}
123
124fn load_state(repo: &Repository, state_id: &[u8]) -> Result<(ChangeId, State), Status> {
127 let id = ChangeId::try_from_slice(state_id)
128 .map_err(|err| Status::invalid_argument(format!("invalid state_id: {err}")))?;
129 let state = repo
130 .store()
131 .get_state(&id)
132 .map_err(to_status)?
133 .ok_or_else(|| Status::not_found(format!("state {} not found", id.to_string_full())))?;
134 Ok((id, state))
135}
136
137fn decode_blob_for_state(repo: &Repository, state: &State) -> Result<DiscussionsBlob, Status> {
140 let Some(hash) = state.discussions else {
141 return Ok(DiscussionsBlob::new(Vec::new()));
142 };
143 let blob = repo
144 .store()
145 .get_blob(&hash)
146 .map_err(to_status)?
147 .ok_or_else(|| {
148 Status::not_found(format!(
149 "discussions blob {} referenced by state {} is missing",
150 hash,
151 state.change_id.to_string_full()
152 ))
153 })?;
154 DiscussionsBlob::decode(blob.content())
155 .map_err(|err| Status::internal(format!("failed to decode discussions blob: {err}")))
156}
157
158fn load_discussions_blob(
160 repo: &Repository,
161 state_id: &ChangeId,
162) -> Result<(State, DiscussionsBlob), Status> {
163 let state = repo
164 .store()
165 .get_state(state_id)
166 .map_err(to_status)?
167 .ok_or_else(|| {
168 Status::not_found(format!("state {} not found", state_id.to_string_full()))
169 })?;
170 let blob = decode_blob_for_state(repo, &state)?;
171 Ok((state, blob))
172}
173
174fn save_discussions_blob(
177 repo: &Repository,
178 state: &State,
179 blob: &DiscussionsBlob,
180) -> Result<State, Status> {
181 let bytes = blob
182 .encode()
183 .map_err(|err| Status::internal(format!("failed to encode discussions blob: {err}")))?;
184 let hash = repo
185 .store()
186 .put_blob(&Blob::new(bytes))
187 .map_err(to_status)?;
188 let new_state = state.clone().with_discussions(hash);
189 repo.store().put_state(&new_state).map_err(to_status)?;
190 Ok(new_state)
191}
192
193fn principal_for(repo: &Repository) -> Principal {
198 if let Some(pc) = &repo.config().principal {
199 Principal::new(&pc.name, &pc.email)
200 } else {
201 Principal::new("<unknown>", "")
202 }
203}
204
205fn head_state(repo: &Repository) -> Result<State, Status> {
208 let head_id = repo
209 .head()
210 .map_err(to_status)?
211 .ok_or_else(|| Status::failed_precondition("repository has no HEAD"))?;
212 repo.store()
213 .get_state(&head_id)
214 .map_err(to_status)?
215 .ok_or_else(|| {
216 Status::not_found(format!("HEAD state {} not found", head_id.to_string_full()))
217 })
218}
219
220fn status_matches(d: &Discussion, status: &str) -> bool {
223 match status {
224 "open" => d.is_open(),
225 "resolved" => !d.is_open(),
226 "orphaned" => d.orphaned,
227 _ => true,
229 }
230}
231
232#[tonic::async_trait]
233impl DiscussionService for LocalDiscussionService {
234 async fn open_discussion(
235 &self,
236 request: Request<OpenDiscussionRequest>,
237 ) -> Result<Response<ProtoDiscussion>, Status> {
238 let req = request.into_inner();
239 let req_bytes = req.encode_to_vec();
240 let client_op_id = req.client_operation_id.clone();
241 let inner = self.inner.clone();
242
243 let result = with_idempotency(
244 self.inner.dedup(),
245 &client_op_id,
246 "discussion.open",
247 &req_bytes,
248 |proto: &ProtoDiscussion| proto.encode_to_vec(),
249 |bytes| {
250 ProtoDiscussion::decode(bytes.as_slice())
251 .map_err(|e| Status::internal(e.to_string()))
252 },
253 move || {
254 let req = req.clone();
255 let inner = inner.clone();
256 async move {
257 let repo = inner.repo();
258 let anchor_proto = req
259 .anchor
260 .clone()
261 .ok_or_else(|| Status::invalid_argument("anchor is required"))?;
262 if anchor_proto.file.is_empty() {
263 return Err(Status::invalid_argument("anchor.file is required"));
264 }
265 if anchor_proto.symbol.is_empty() {
266 return Err(Status::invalid_argument("anchor.symbol is required"));
267 }
268 if req.body.trim().is_empty() {
269 return Err(Status::invalid_argument("body must be non-empty"));
270 }
271 let opened_against =
272 ChangeId::try_from_slice(&req.state_id).map_err(|err| {
273 Status::invalid_argument(format!("invalid state_id: {err}"))
274 })?;
275 let (state, mut blob) = load_discussions_blob(repo, &opened_against)?;
276 let now = now_secs();
277 let principal = principal_for(repo);
278 let discussion = Discussion {
279 id: ChangeId::generate().to_string_full(),
280 anchor: SymbolAnchor::new(anchor_proto.file, anchor_proto.symbol),
281 opened_against_state: opened_against,
282 opened_at: now,
283 thread_ref: (!req.thread_ref.is_empty()).then(|| req.thread_ref.clone()),
284 turns: vec![DiscussionTurn {
285 author: principal,
286 body: req.body.clone(),
287 posted_at: now,
288 }],
289 resolution: DiscussionResolution::Open,
290 body_changed_since_open: false,
291 orphaned: false,
292 visibility: parse_visibility(&req.visibility),
293 resolved_annotation_id: None,
294 };
295 discussion
296 .validate()
297 .map_err(|err| Status::invalid_argument(err.to_string()))?;
298 blob.discussions.push(discussion.clone());
299 save_discussions_blob(repo, &state, &blob)?;
300 Ok(discussion_to_proto(&discussion))
301 }
302 },
303 )
304 .await?;
305
306 Ok(Response::new(result))
307 }
308
309 async fn append_turn(
310 &self,
311 request: Request<AppendTurnRequest>,
312 ) -> Result<Response<ProtoDiscussion>, Status> {
313 let req = request.into_inner();
314 let req_bytes = req.encode_to_vec();
315 let client_op_id = req.client_operation_id.clone();
316 let inner = self.inner.clone();
317
318 let result = with_idempotency(
319 self.inner.dedup(),
320 &client_op_id,
321 "discussion.append_turn",
322 &req_bytes,
323 |proto: &ProtoDiscussion| proto.encode_to_vec(),
324 |bytes| {
325 ProtoDiscussion::decode(bytes.as_slice())
326 .map_err(|e| Status::internal(e.to_string()))
327 },
328 move || {
329 let req = req.clone();
330 let inner = inner.clone();
331 async move {
332 let repo = inner.repo();
333 if req.discussion_id.is_empty() {
334 return Err(Status::invalid_argument("discussion_id is required"));
335 }
336 if req.body.trim().is_empty() {
337 return Err(Status::invalid_argument("body must be non-empty"));
338 }
339 let head = head_state(repo)?;
341 let mut blob = decode_blob_for_state(repo, &head)?;
342 let idx = blob
343 .discussions
344 .iter()
345 .position(|d| d.id == req.discussion_id)
346 .ok_or_else(|| {
347 Status::not_found(format!("discussion {} not found", req.discussion_id))
348 })?;
349 let principal = principal_for(repo);
350 blob.discussions[idx].turns.push(DiscussionTurn {
351 author: principal,
352 body: req.body.clone(),
353 posted_at: now_secs(),
354 });
355 blob.discussions[idx]
356 .validate()
357 .map_err(|err| Status::invalid_argument(err.to_string()))?;
358 let updated = blob.discussions[idx].clone();
359 save_discussions_blob(repo, &head, &blob)?;
360 Ok(discussion_to_proto(&updated))
361 }
362 },
363 )
364 .await?;
365
366 Ok(Response::new(result))
367 }
368
369 async fn resolve_discussion(
370 &self,
371 request: Request<ResolveDiscussionRequest>,
372 ) -> Result<Response<ProtoDiscussion>, Status> {
373 let req = request.into_inner();
374 let req_bytes = req.encode_to_vec();
375 let client_op_id = req.client_operation_id.clone();
376 let inner = self.inner.clone();
377
378 let result = with_idempotency(
379 self.inner.dedup(),
380 &client_op_id,
381 "discussion.resolve",
382 &req_bytes,
383 |proto: &ProtoDiscussion| proto.encode_to_vec(),
384 |bytes| {
385 ProtoDiscussion::decode(bytes.as_slice())
386 .map_err(|e| Status::internal(e.to_string()))
387 },
388 move || {
389 let req = req.clone();
390 let inner = inner.clone();
391 async move {
392 let repo = inner.repo();
393 if req.discussion_id.is_empty() {
394 return Err(Status::invalid_argument("discussion_id is required"));
395 }
396 let head = head_state(repo)?;
398 let mut blob = decode_blob_for_state(repo, &head)?;
399 let idx = blob
400 .discussions
401 .iter()
402 .position(|d| d.id == req.discussion_id)
403 .ok_or_else(|| {
404 Status::not_found(format!("discussion {} not found", req.discussion_id))
405 })?;
406
407 use grpc::heddle::v1::resolve_discussion_request::Resolution;
408 let resolution = req
409 .resolution
410 .clone()
411 .ok_or_else(|| Status::invalid_argument("resolution mode is required"))?;
412 match resolution {
413 Resolution::IntoAnnotation(_payload) => {
414 let annotation_id = ChangeId::generate().to_string_full();
421 blob.discussions[idx].resolution =
422 DiscussionResolution::ResolvedIntoAnnotation {
423 annotation_id: annotation_id.clone(),
424 };
425 blob.discussions[idx].resolved_annotation_id = Some(annotation_id);
426 }
427 Resolution::ByEdit(payload) => {
428 let state_id =
429 ChangeId::try_from_slice(&payload.state_id).map_err(|err| {
430 Status::invalid_argument(format!("invalid state_id: {err}"))
431 })?;
432 blob.discussions[idx].resolution =
433 DiscussionResolution::ResolvedByEdit { state_id };
434 }
435 Resolution::Dismissed(payload) => {
436 if payload.reason.trim().is_empty() {
437 return Err(Status::invalid_argument(
438 "dismissal requires a non-empty reason",
439 ));
440 }
441 blob.discussions[idx].resolution = DiscussionResolution::Dismissed {
442 reason: payload.reason,
443 };
444 }
445 }
446
447 blob.discussions[idx]
448 .validate()
449 .map_err(|err| Status::invalid_argument(err.to_string()))?;
450 let updated = blob.discussions[idx].clone();
451 save_discussions_blob(repo, &head, &blob)?;
452 Ok(discussion_to_proto(&updated))
453 }
454 },
455 )
456 .await?;
457
458 Ok(Response::new(result))
459 }
460
461 async fn list_by_state(
462 &self,
463 request: Request<ListDiscussionsByStateRequest>,
464 ) -> Result<Response<ListDiscussionsResponse>, Status> {
465 let req = request.into_inner();
466 let repo = self.inner.repo();
467 let (_, state) = load_state(repo, &req.state_id)?;
468 let blob = decode_blob_for_state(repo, &state)?;
469 let discussions = blob
470 .discussions
471 .iter()
472 .filter(|d| status_matches(d, &req.status))
473 .map(discussion_to_proto)
474 .collect();
475 Ok(Response::new(ListDiscussionsResponse { discussions }))
476 }
477
478 async fn list_by_symbol(
479 &self,
480 request: Request<ListDiscussionsBySymbolRequest>,
481 ) -> Result<Response<ListDiscussionsResponse>, Status> {
482 let req = request.into_inner();
483 let anchor = req
484 .anchor
485 .ok_or_else(|| Status::invalid_argument("anchor is required"))?;
486 if anchor.file.is_empty() || anchor.symbol.is_empty() {
487 return Err(Status::invalid_argument(
488 "anchor.file and anchor.symbol are required",
489 ));
490 }
491 let repo = self.inner.repo();
494 let head = head_state(repo)?;
495 let blob = decode_blob_for_state(repo, &head)?;
496 let discussions = blob
497 .discussions
498 .iter()
499 .filter(|d| d.anchor.file == anchor.file && d.anchor.symbol == anchor.symbol)
500 .filter(|d| status_matches(d, &req.status))
501 .map(discussion_to_proto)
502 .collect();
503 Ok(Response::new(ListDiscussionsResponse { discussions }))
504 }
505
506 async fn get_discussion(
507 &self,
508 request: Request<GetDiscussionRequest>,
509 ) -> Result<Response<ProtoDiscussion>, Status> {
510 let req = request.into_inner();
511 if req.discussion_id.is_empty() {
512 return Err(Status::invalid_argument("discussion_id is required"));
513 }
514 let repo = self.inner.repo();
516 let head = head_state(repo)?;
517 let blob = decode_blob_for_state(repo, &head)?;
518 let discussion = blob
519 .discussions
520 .iter()
521 .find(|d| d.id == req.discussion_id)
522 .ok_or_else(|| {
523 Status::not_found(format!("discussion {} not found", req.discussion_id))
524 })?;
525 Ok(Response::new(discussion_to_proto(discussion)))
526 }
527}
528
529#[cfg(test)]
530mod tests {
531 use std::sync::Arc;
532
533 use objects::object::{Attribution, Principal};
534 use repo::{Repository, operation_dedup::OperationDedupStore};
535 use tempfile::TempDir;
536
537 use super::*;
538
539 fn fresh_service() -> (TempDir, ChangeId, LocalDiscussionService) {
540 let temp = TempDir::new().unwrap();
541 let repo = Repository::init_default(temp.path()).unwrap();
542 let attribution = Attribution::human(Principal::new("Tester", "tester@example.com"));
544 let state = repo
545 .snapshot_with_attribution(Some("seed".into()), None, attribution)
546 .unwrap();
547 let dedup = OperationDedupStore::open(repo.heddle_dir()).unwrap();
548 let inner = GrpcLocalService::new(Arc::new(repo), Arc::new(dedup));
549 let svc = LocalDiscussionService::new(inner);
550 (temp, state.change_id, svc)
551 }
552
553 fn open_request(state_id: &ChangeId, body: &str, op_id: &str) -> OpenDiscussionRequest {
554 OpenDiscussionRequest {
555 repo_path: String::new(),
556 state_id: state_id.as_bytes().to_vec(),
557 anchor: Some(PathSymbolRef {
558 file: "src/lib.rs".into(),
559 symbol: "foo".into(),
560 }),
561 body: body.into(),
562 visibility: String::new(),
563 thread_ref: String::new(),
564 client_operation_id: op_id.into(),
565 }
566 }
567
568 #[tokio::test]
569 async fn open_then_append_turn_persists_both_turns() {
570 let (_t, state_id, svc) = fresh_service();
571 let opened = svc
572 .open_discussion(Request::new(open_request(&state_id, "first", "")))
573 .await
574 .unwrap()
575 .into_inner();
576 assert_eq!(opened.turns.len(), 1);
577 assert_eq!(opened.turns[0].body, "first");
578
579 let appended = svc
580 .append_turn(Request::new(AppendTurnRequest {
581 repo_path: String::new(),
582 discussion_id: opened.id.clone(),
583 body: "second".into(),
584 client_operation_id: String::new(),
585 }))
586 .await
587 .unwrap()
588 .into_inner();
589 assert_eq!(appended.turns.len(), 2);
590 assert_eq!(appended.turns[0].body, "first");
591 assert_eq!(appended.turns[1].body, "second");
592
593 let listed = svc
595 .list_by_state(Request::new(ListDiscussionsByStateRequest {
596 repo_path: String::new(),
597 state_id: state_id.as_bytes().to_vec(),
598 status: "all".into(),
599 }))
600 .await
601 .unwrap()
602 .into_inner();
603 assert_eq!(listed.discussions.len(), 1);
606 assert_eq!(listed.discussions[0].turns.len(), 2);
607 }
608
609 #[tokio::test]
610 async fn open_idempotent_returns_same_discussion() {
611 let (_t, state_id, svc) = fresh_service();
612 let op_id = "11111111-2222-3333-4444-555555555555";
613 let first = svc
614 .open_discussion(Request::new(open_request(&state_id, "hello", op_id)))
615 .await
616 .unwrap()
617 .into_inner();
618 let second = svc
619 .open_discussion(Request::new(open_request(&state_id, "hello", op_id)))
620 .await
621 .unwrap()
622 .into_inner();
623 assert_eq!(first.id, second.id);
624 assert_eq!(first.turns.len(), 1);
625 assert_eq!(second.turns.len(), 1);
626 }
627
628 #[tokio::test]
629 async fn resolve_dismissed_with_empty_reason_is_invalid_argument() {
630 let (_t, state_id, svc) = fresh_service();
631 let opened = svc
632 .open_discussion(Request::new(open_request(&state_id, "why?", "")))
633 .await
634 .unwrap()
635 .into_inner();
636
637 use grpc::heddle::v1::resolve_discussion_request::{Resolution, ResolveDismissed};
638 let err = svc
639 .resolve_discussion(Request::new(ResolveDiscussionRequest {
640 repo_path: String::new(),
641 discussion_id: opened.id,
642 resolution: Some(Resolution::Dismissed(ResolveDismissed {
643 reason: " ".into(),
644 })),
645 client_operation_id: String::new(),
646 }))
647 .await
648 .unwrap_err();
649 assert_eq!(err.code(), tonic::Code::InvalidArgument);
650 }
651
652 #[tokio::test]
653 async fn list_by_state_filters_by_status() {
654 let (_t, state_id, svc) = fresh_service();
655 let a = svc
657 .open_discussion(Request::new(open_request(&state_id, "a", "")))
658 .await
659 .unwrap()
660 .into_inner();
661 let _b = svc
662 .open_discussion(Request::new(open_request(&state_id, "b", "")))
663 .await
664 .unwrap()
665 .into_inner();
666
667 use grpc::heddle::v1::resolve_discussion_request::{Resolution, ResolveDismissed};
668 svc.resolve_discussion(Request::new(ResolveDiscussionRequest {
669 repo_path: String::new(),
670 discussion_id: a.id.clone(),
671 resolution: Some(Resolution::Dismissed(ResolveDismissed {
672 reason: "no longer relevant".into(),
673 })),
674 client_operation_id: String::new(),
675 }))
676 .await
677 .unwrap();
678
679 let head_state_id = state_id.as_bytes().to_vec();
691 let open_only = svc
692 .list_by_state(Request::new(ListDiscussionsByStateRequest {
693 repo_path: String::new(),
694 state_id: head_state_id.clone(),
695 status: "open".into(),
696 }))
697 .await
698 .unwrap()
699 .into_inner();
700 assert_eq!(open_only.discussions.len(), 1);
701 assert_eq!(open_only.discussions[0].turns[0].body, "b");
702
703 let resolved_only = svc
704 .list_by_state(Request::new(ListDiscussionsByStateRequest {
705 repo_path: String::new(),
706 state_id: head_state_id.clone(),
707 status: "resolved".into(),
708 }))
709 .await
710 .unwrap()
711 .into_inner();
712 assert_eq!(resolved_only.discussions.len(), 1);
713 assert_eq!(resolved_only.discussions[0].turns[0].body, "a");
714
715 let all = svc
716 .list_by_state(Request::new(ListDiscussionsByStateRequest {
717 repo_path: String::new(),
718 state_id: head_state_id,
719 status: "all".into(),
720 }))
721 .await
722 .unwrap()
723 .into_inner();
724 assert_eq!(all.discussions.len(), 2);
725 }
726}