Skip to main content

guts_p2p/
collaboration_message.rs

1//! P2P protocol messages for collaboration replication (PRs, Issues, Comments, Reviews).
2
3use bytes::{Buf, BufMut, Bytes, BytesMut};
4use guts_collaboration::{
5    Comment, CommentTarget, Issue, IssueState, Label, PullRequest, PullRequestState, Review,
6    ReviewState,
7};
8use guts_storage::ObjectId;
9use serde::{Deserialize, Serialize};
10
11use crate::{P2PError, Result};
12
13/// Collaboration message type discriminator.
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15#[repr(u8)]
16pub enum CollaborationMessageType {
17    /// Pull request created.
18    PullRequestCreated = 10,
19    /// Pull request updated.
20    PullRequestUpdated = 11,
21    /// Issue created.
22    IssueCreated = 12,
23    /// Issue updated.
24    IssueUpdated = 13,
25    /// Comment created.
26    CommentCreated = 14,
27    /// Review created.
28    ReviewCreated = 15,
29    /// Request collaboration data sync.
30    SyncCollaborationRequest = 16,
31    /// Response with collaboration data.
32    SyncCollaborationResponse = 17,
33}
34
35impl CollaborationMessageType {
36    /// Parse a message type from a byte.
37    pub fn from_byte(b: u8) -> Result<Self> {
38        match b {
39            10 => Ok(CollaborationMessageType::PullRequestCreated),
40            11 => Ok(CollaborationMessageType::PullRequestUpdated),
41            12 => Ok(CollaborationMessageType::IssueCreated),
42            13 => Ok(CollaborationMessageType::IssueUpdated),
43            14 => Ok(CollaborationMessageType::CommentCreated),
44            15 => Ok(CollaborationMessageType::ReviewCreated),
45            16 => Ok(CollaborationMessageType::SyncCollaborationRequest),
46            17 => Ok(CollaborationMessageType::SyncCollaborationResponse),
47            _ => Err(P2PError::InvalidMessage(format!(
48                "unknown collaboration message type: {}",
49                b
50            ))),
51        }
52    }
53}
54
55/// Serializable version of a pull request for P2P transmission.
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct SerializablePullRequest {
58    pub id: u64,
59    pub repo_key: String,
60    pub number: u32,
61    pub title: String,
62    pub description: String,
63    pub author: String,
64    pub state: String,
65    pub source_branch: String,
66    pub target_branch: String,
67    pub source_commit: String,
68    pub target_commit: String,
69    pub labels: Vec<SerializableLabel>,
70    pub created_at: u64,
71    pub updated_at: u64,
72    pub merged_at: Option<u64>,
73    pub merged_by: Option<String>,
74}
75
76impl From<PullRequest> for SerializablePullRequest {
77    fn from(pr: PullRequest) -> Self {
78        Self {
79            id: pr.id,
80            repo_key: pr.repo_key,
81            number: pr.number,
82            title: pr.title,
83            description: pr.description,
84            author: pr.author,
85            state: pr.state.to_string(),
86            source_branch: pr.source_branch,
87            target_branch: pr.target_branch,
88            source_commit: pr.source_commit.to_hex(),
89            target_commit: pr.target_commit.to_hex(),
90            labels: pr.labels.into_iter().map(Into::into).collect(),
91            created_at: pr.created_at,
92            updated_at: pr.updated_at,
93            merged_at: pr.merged_at,
94            merged_by: pr.merged_by,
95        }
96    }
97}
98
99impl SerializablePullRequest {
100    /// Convert back to a PullRequest.
101    pub fn into_pull_request(self) -> Result<PullRequest> {
102        let source_commit = ObjectId::from_hex(&self.source_commit)
103            .map_err(|e| P2PError::InvalidMessage(e.to_string()))?;
104        let target_commit = ObjectId::from_hex(&self.target_commit)
105            .map_err(|e| P2PError::InvalidMessage(e.to_string()))?;
106
107        let state = match self.state.as_str() {
108            "open" => PullRequestState::Open,
109            "closed" => PullRequestState::Closed,
110            "merged" => PullRequestState::Merged,
111            s => return Err(P2PError::InvalidMessage(format!("invalid PR state: {}", s))),
112        };
113
114        let mut pr = PullRequest::new(
115            self.id,
116            self.repo_key,
117            self.number,
118            self.title,
119            self.description,
120            self.author,
121            self.source_branch,
122            self.target_branch,
123            source_commit,
124            target_commit,
125        );
126
127        // Set the stored values
128        pr.id = self.id;
129        pr.number = self.number;
130        pr.state = state;
131        pr.created_at = self.created_at;
132        pr.updated_at = self.updated_at;
133        pr.merged_at = self.merged_at;
134        pr.merged_by = self.merged_by;
135
136        for label in self.labels {
137            pr.labels.push(label.into_label());
138        }
139
140        Ok(pr)
141    }
142}
143
144/// Serializable version of an issue for P2P transmission.
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct SerializableIssue {
147    pub id: u64,
148    pub repo_key: String,
149    pub number: u32,
150    pub title: String,
151    pub description: String,
152    pub author: String,
153    pub state: String,
154    pub labels: Vec<SerializableLabel>,
155    pub created_at: u64,
156    pub updated_at: u64,
157    pub closed_at: Option<u64>,
158    pub closed_by: Option<String>,
159}
160
161impl From<Issue> for SerializableIssue {
162    fn from(issue: Issue) -> Self {
163        Self {
164            id: issue.id,
165            repo_key: issue.repo_key,
166            number: issue.number,
167            title: issue.title,
168            description: issue.description,
169            author: issue.author,
170            state: issue.state.to_string(),
171            labels: issue.labels.into_iter().map(Into::into).collect(),
172            created_at: issue.created_at,
173            updated_at: issue.updated_at,
174            closed_at: issue.closed_at,
175            closed_by: issue.closed_by,
176        }
177    }
178}
179
180impl SerializableIssue {
181    /// Convert back to an Issue.
182    pub fn into_issue(self) -> Result<Issue> {
183        let state = match self.state.as_str() {
184            "open" => IssueState::Open,
185            "closed" => IssueState::Closed,
186            s => {
187                return Err(P2PError::InvalidMessage(format!(
188                    "invalid issue state: {}",
189                    s
190                )))
191            }
192        };
193
194        let mut issue = Issue::new(
195            self.id,
196            self.repo_key,
197            self.number,
198            self.title,
199            self.description,
200            self.author,
201        );
202
203        // Set the stored values
204        issue.id = self.id;
205        issue.number = self.number;
206        issue.state = state;
207        issue.created_at = self.created_at;
208        issue.updated_at = self.updated_at;
209        issue.closed_at = self.closed_at;
210        issue.closed_by = self.closed_by;
211
212        for label in self.labels {
213            issue.labels.push(label.into_label());
214        }
215
216        Ok(issue)
217    }
218}
219
220/// Serializable version of a label.
221#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct SerializableLabel {
223    pub name: String,
224    pub color: String,
225    pub description: Option<String>,
226}
227
228impl From<Label> for SerializableLabel {
229    fn from(label: Label) -> Self {
230        Self {
231            name: label.name,
232            color: label.color,
233            description: label.description,
234        }
235    }
236}
237
238impl SerializableLabel {
239    /// Convert back to a Label.
240    pub fn into_label(self) -> Label {
241        let mut label = Label::new(self.name, self.color);
242        if let Some(desc) = self.description {
243            label = label.with_description(desc);
244        }
245        label
246    }
247}
248
249/// Serializable version of a comment.
250#[derive(Debug, Clone, Serialize, Deserialize)]
251pub struct SerializableComment {
252    pub id: u64,
253    pub target_type: String,
254    pub repo_key: String,
255    pub number: u32,
256    pub author: String,
257    pub body: String,
258    pub created_at: u64,
259    pub updated_at: u64,
260}
261
262impl From<Comment> for SerializableComment {
263    fn from(comment: Comment) -> Self {
264        let (target_type, repo_key, number) = match &comment.target {
265            CommentTarget::PullRequest { repo_key, number } => {
266                ("pull_request".to_string(), repo_key.clone(), *number)
267            }
268            CommentTarget::Issue { repo_key, number } => {
269                ("issue".to_string(), repo_key.clone(), *number)
270            }
271        };
272
273        Self {
274            id: comment.id,
275            target_type,
276            repo_key,
277            number,
278            author: comment.author,
279            body: comment.body,
280            created_at: comment.created_at,
281            updated_at: comment.updated_at,
282        }
283    }
284}
285
286impl SerializableComment {
287    /// Convert back to a Comment.
288    pub fn into_comment(self) -> Result<Comment> {
289        let target = match self.target_type.as_str() {
290            "pull_request" => CommentTarget::pull_request(&self.repo_key, self.number),
291            "issue" => CommentTarget::issue(&self.repo_key, self.number),
292            t => {
293                return Err(P2PError::InvalidMessage(format!(
294                    "invalid comment target type: {}",
295                    t
296                )))
297            }
298        };
299
300        let mut comment = Comment::new(self.id, target, self.author, self.body);
301        comment.id = self.id;
302        comment.created_at = self.created_at;
303        comment.updated_at = self.updated_at;
304
305        Ok(comment)
306    }
307}
308
309/// Serializable version of a review.
310#[derive(Debug, Clone, Serialize, Deserialize)]
311pub struct SerializableReview {
312    pub id: u64,
313    pub repo_key: String,
314    pub pr_number: u32,
315    pub author: String,
316    pub state: String,
317    pub body: Option<String>,
318    pub commit_id: String,
319    pub created_at: u64,
320}
321
322impl From<Review> for SerializableReview {
323    fn from(review: Review) -> Self {
324        Self {
325            id: review.id,
326            repo_key: review.repo_key,
327            pr_number: review.pr_number,
328            author: review.author,
329            state: review.state.to_string(),
330            body: review.body,
331            commit_id: review.commit_id,
332            created_at: review.created_at,
333        }
334    }
335}
336
337impl SerializableReview {
338    /// Convert back to a Review.
339    pub fn into_review(self) -> Result<Review> {
340        let state = match self.state.as_str() {
341            "approved" => ReviewState::Approved,
342            "changes_requested" => ReviewState::ChangesRequested,
343            "commented" => ReviewState::Commented,
344            "dismissed" => ReviewState::Dismissed,
345            s => {
346                return Err(P2PError::InvalidMessage(format!(
347                    "invalid review state: {}",
348                    s
349                )))
350            }
351        };
352
353        let mut review = Review::new(
354            self.id,
355            self.repo_key,
356            self.pr_number,
357            self.author,
358            state,
359            self.commit_id,
360        );
361
362        if let Some(body) = self.body {
363            review = review.with_body(body);
364        }
365
366        review.id = self.id;
367        review.created_at = self.created_at;
368
369        Ok(review)
370    }
371}
372
373/// Collaboration message for P2P transmission.
374#[derive(Debug, Clone)]
375pub enum CollaborationMessage {
376    /// A new pull request was created.
377    PullRequestCreated(SerializablePullRequest),
378    /// A pull request was updated.
379    PullRequestUpdated(SerializablePullRequest),
380    /// A new issue was created.
381    IssueCreated(SerializableIssue),
382    /// An issue was updated.
383    IssueUpdated(SerializableIssue),
384    /// A new comment was created.
385    CommentCreated(SerializableComment),
386    /// A new review was created.
387    ReviewCreated(SerializableReview),
388    /// Request sync of collaboration data.
389    SyncCollaborationRequest { repo_key: String },
390    /// Response with collaboration data.
391    SyncCollaborationResponse {
392        repo_key: String,
393        pull_requests: Vec<SerializablePullRequest>,
394        issues: Vec<SerializableIssue>,
395        comments: Vec<SerializableComment>,
396        reviews: Vec<SerializableReview>,
397    },
398}
399
400impl CollaborationMessage {
401    /// Encode the message to bytes.
402    pub fn encode(&self) -> Bytes {
403        let mut buf = BytesMut::new();
404
405        match self {
406            CollaborationMessage::PullRequestCreated(pr) => {
407                buf.put_u8(CollaborationMessageType::PullRequestCreated as u8);
408                let json = serde_json::to_vec(pr).unwrap();
409                buf.put_u32(json.len() as u32);
410                buf.put_slice(&json);
411            }
412            CollaborationMessage::PullRequestUpdated(pr) => {
413                buf.put_u8(CollaborationMessageType::PullRequestUpdated as u8);
414                let json = serde_json::to_vec(pr).unwrap();
415                buf.put_u32(json.len() as u32);
416                buf.put_slice(&json);
417            }
418            CollaborationMessage::IssueCreated(issue) => {
419                buf.put_u8(CollaborationMessageType::IssueCreated as u8);
420                let json = serde_json::to_vec(issue).unwrap();
421                buf.put_u32(json.len() as u32);
422                buf.put_slice(&json);
423            }
424            CollaborationMessage::IssueUpdated(issue) => {
425                buf.put_u8(CollaborationMessageType::IssueUpdated as u8);
426                let json = serde_json::to_vec(issue).unwrap();
427                buf.put_u32(json.len() as u32);
428                buf.put_slice(&json);
429            }
430            CollaborationMessage::CommentCreated(comment) => {
431                buf.put_u8(CollaborationMessageType::CommentCreated as u8);
432                let json = serde_json::to_vec(comment).unwrap();
433                buf.put_u32(json.len() as u32);
434                buf.put_slice(&json);
435            }
436            CollaborationMessage::ReviewCreated(review) => {
437                buf.put_u8(CollaborationMessageType::ReviewCreated as u8);
438                let json = serde_json::to_vec(review).unwrap();
439                buf.put_u32(json.len() as u32);
440                buf.put_slice(&json);
441            }
442            CollaborationMessage::SyncCollaborationRequest { repo_key } => {
443                buf.put_u8(CollaborationMessageType::SyncCollaborationRequest as u8);
444                let repo_bytes = repo_key.as_bytes();
445                buf.put_u16(repo_bytes.len() as u16);
446                buf.put_slice(repo_bytes);
447            }
448            CollaborationMessage::SyncCollaborationResponse {
449                repo_key,
450                pull_requests,
451                issues,
452                comments,
453                reviews,
454            } => {
455                buf.put_u8(CollaborationMessageType::SyncCollaborationResponse as u8);
456
457                // Repo key
458                let repo_bytes = repo_key.as_bytes();
459                buf.put_u16(repo_bytes.len() as u16);
460                buf.put_slice(repo_bytes);
461
462                // PRs
463                let pr_json = serde_json::to_vec(pull_requests).unwrap();
464                buf.put_u32(pr_json.len() as u32);
465                buf.put_slice(&pr_json);
466
467                // Issues
468                let issue_json = serde_json::to_vec(issues).unwrap();
469                buf.put_u32(issue_json.len() as u32);
470                buf.put_slice(&issue_json);
471
472                // Comments
473                let comment_json = serde_json::to_vec(comments).unwrap();
474                buf.put_u32(comment_json.len() as u32);
475                buf.put_slice(&comment_json);
476
477                // Reviews
478                let review_json = serde_json::to_vec(reviews).unwrap();
479                buf.put_u32(review_json.len() as u32);
480                buf.put_slice(&review_json);
481            }
482        }
483
484        buf.freeze()
485    }
486
487    /// Decode a message from bytes.
488    pub fn decode(data: &[u8]) -> Result<Self> {
489        if data.is_empty() {
490            return Err(P2PError::InvalidMessage(
491                "empty collaboration message".into(),
492            ));
493        }
494
495        let msg_type = CollaborationMessageType::from_byte(data[0])?;
496        let mut payload = &data[1..];
497
498        match msg_type {
499            CollaborationMessageType::PullRequestCreated => {
500                let len = read_u32(&mut payload)?;
501                let pr: SerializablePullRequest = serde_json::from_slice(&payload[..len as usize])
502                    .map_err(|e| P2PError::InvalidMessage(e.to_string()))?;
503                Ok(CollaborationMessage::PullRequestCreated(pr))
504            }
505            CollaborationMessageType::PullRequestUpdated => {
506                let len = read_u32(&mut payload)?;
507                let pr: SerializablePullRequest = serde_json::from_slice(&payload[..len as usize])
508                    .map_err(|e| P2PError::InvalidMessage(e.to_string()))?;
509                Ok(CollaborationMessage::PullRequestUpdated(pr))
510            }
511            CollaborationMessageType::IssueCreated => {
512                let len = read_u32(&mut payload)?;
513                let issue: SerializableIssue = serde_json::from_slice(&payload[..len as usize])
514                    .map_err(|e| P2PError::InvalidMessage(e.to_string()))?;
515                Ok(CollaborationMessage::IssueCreated(issue))
516            }
517            CollaborationMessageType::IssueUpdated => {
518                let len = read_u32(&mut payload)?;
519                let issue: SerializableIssue = serde_json::from_slice(&payload[..len as usize])
520                    .map_err(|e| P2PError::InvalidMessage(e.to_string()))?;
521                Ok(CollaborationMessage::IssueUpdated(issue))
522            }
523            CollaborationMessageType::CommentCreated => {
524                let len = read_u32(&mut payload)?;
525                let comment: SerializableComment = serde_json::from_slice(&payload[..len as usize])
526                    .map_err(|e| P2PError::InvalidMessage(e.to_string()))?;
527                Ok(CollaborationMessage::CommentCreated(comment))
528            }
529            CollaborationMessageType::ReviewCreated => {
530                let len = read_u32(&mut payload)?;
531                let review: SerializableReview =
532                    serde_json::from_slice(&payload[..len as usize])
533                        .map_err(|e| P2PError::InvalidMessage(e.to_string()))?;
534                Ok(CollaborationMessage::ReviewCreated(review))
535            }
536            CollaborationMessageType::SyncCollaborationRequest => {
537                let repo_len = read_u16(&mut payload)?;
538                let repo_key = String::from_utf8(payload[..repo_len as usize].to_vec())
539                    .map_err(|e| P2PError::InvalidMessage(e.to_string()))?;
540                Ok(CollaborationMessage::SyncCollaborationRequest { repo_key })
541            }
542            CollaborationMessageType::SyncCollaborationResponse => {
543                // Repo key
544                let repo_len = read_u16(&mut payload)?;
545                let repo_key = String::from_utf8(payload[..repo_len as usize].to_vec())
546                    .map_err(|e| P2PError::InvalidMessage(e.to_string()))?;
547                payload = &payload[repo_len as usize..];
548
549                // PRs
550                let pr_len = read_u32(&mut payload)?;
551                let pull_requests: Vec<SerializablePullRequest> =
552                    serde_json::from_slice(&payload[..pr_len as usize])
553                        .map_err(|e| P2PError::InvalidMessage(e.to_string()))?;
554                payload = &payload[pr_len as usize..];
555
556                // Issues
557                let issue_len = read_u32(&mut payload)?;
558                let issues: Vec<SerializableIssue> =
559                    serde_json::from_slice(&payload[..issue_len as usize])
560                        .map_err(|e| P2PError::InvalidMessage(e.to_string()))?;
561                payload = &payload[issue_len as usize..];
562
563                // Comments
564                let comment_len = read_u32(&mut payload)?;
565                let comments: Vec<SerializableComment> =
566                    serde_json::from_slice(&payload[..comment_len as usize])
567                        .map_err(|e| P2PError::InvalidMessage(e.to_string()))?;
568                payload = &payload[comment_len as usize..];
569
570                // Reviews
571                let review_len = read_u32(&mut payload)?;
572                let reviews: Vec<SerializableReview> =
573                    serde_json::from_slice(&payload[..review_len as usize])
574                        .map_err(|e| P2PError::InvalidMessage(e.to_string()))?;
575
576                Ok(CollaborationMessage::SyncCollaborationResponse {
577                    repo_key,
578                    pull_requests,
579                    issues,
580                    comments,
581                    reviews,
582                })
583            }
584        }
585    }
586}
587
588fn read_u16(buf: &mut &[u8]) -> Result<u16> {
589    if buf.remaining() < 2 {
590        return Err(P2PError::InvalidMessage("truncated u16".into()));
591    }
592    Ok(buf.get_u16())
593}
594
595fn read_u32(buf: &mut &[u8]) -> Result<u32> {
596    if buf.remaining() < 4 {
597        return Err(P2PError::InvalidMessage("truncated u32".into()));
598    }
599    Ok(buf.get_u32())
600}
601
602#[cfg(test)]
603mod tests {
604    use super::*;
605
606    #[test]
607    fn test_pr_message_roundtrip() {
608        let pr = SerializablePullRequest {
609            id: 1,
610            repo_key: "alice/repo".to_string(),
611            number: 1,
612            title: "Add feature".to_string(),
613            description: "Description".to_string(),
614            author: "alice".to_string(),
615            state: "open".to_string(),
616            source_branch: "feature".to_string(),
617            target_branch: "main".to_string(),
618            source_commit: "0".repeat(40),
619            target_commit: "1".repeat(40),
620            labels: vec![],
621            created_at: 12345,
622            updated_at: 12345,
623            merged_at: None,
624            merged_by: None,
625        };
626
627        let msg = CollaborationMessage::PullRequestCreated(pr.clone());
628        let encoded = msg.encode();
629        let decoded = CollaborationMessage::decode(&encoded).unwrap();
630
631        match decoded {
632            CollaborationMessage::PullRequestCreated(decoded_pr) => {
633                assert_eq!(decoded_pr.id, pr.id);
634                assert_eq!(decoded_pr.title, pr.title);
635                assert_eq!(decoded_pr.number, pr.number);
636            }
637            _ => panic!("wrong message type"),
638        }
639    }
640
641    #[test]
642    fn test_issue_message_roundtrip() {
643        let issue = SerializableIssue {
644            id: 2,
645            repo_key: "bob/project".to_string(),
646            number: 5,
647            title: "Bug report".to_string(),
648            description: "Steps to reproduce".to_string(),
649            author: "bob".to_string(),
650            state: "open".to_string(),
651            labels: vec![SerializableLabel {
652                name: "bug".to_string(),
653                color: "ff0000".to_string(),
654                description: Some("A bug".to_string()),
655            }],
656            created_at: 54321,
657            updated_at: 54321,
658            closed_at: None,
659            closed_by: None,
660        };
661
662        let msg = CollaborationMessage::IssueCreated(issue.clone());
663        let encoded = msg.encode();
664        let decoded = CollaborationMessage::decode(&encoded).unwrap();
665
666        match decoded {
667            CollaborationMessage::IssueCreated(decoded_issue) => {
668                assert_eq!(decoded_issue.id, issue.id);
669                assert_eq!(decoded_issue.title, issue.title);
670                assert_eq!(decoded_issue.labels.len(), 1);
671            }
672            _ => panic!("wrong message type"),
673        }
674    }
675
676    #[test]
677    fn test_sync_request_roundtrip() {
678        let msg = CollaborationMessage::SyncCollaborationRequest {
679            repo_key: "carol/test".to_string(),
680        };
681
682        let encoded = msg.encode();
683        let decoded = CollaborationMessage::decode(&encoded).unwrap();
684
685        match decoded {
686            CollaborationMessage::SyncCollaborationRequest { repo_key } => {
687                assert_eq!(repo_key, "carol/test");
688            }
689            _ => panic!("wrong message type"),
690        }
691    }
692}