1use std::fmt::Write as _;
7
8use panproto_vcs::{HeadState, Object, ObjectId, Store};
9use reqwest::Client;
10use serde::{Deserialize, Serialize};
11
12use crate::error::XrpcError;
13
14#[derive(Debug, Clone)]
16pub struct NodeClient {
17 base_url: String,
19 did: String,
21 repo: String,
23 token: Option<String>,
25 http: Client,
27}
28
29#[derive(Debug, Serialize, Deserialize)]
31pub struct NegotiateResult {
32 pub need: Vec<String>,
34 pub refs: Vec<(String, String)>,
36}
37
38#[derive(Debug, Serialize, Deserialize)]
40pub struct RepoInfo {
41 pub protocol: String,
43 pub default_branch: String,
45 pub commit_count: u64,
47}
48
49#[derive(Clone, Debug, Serialize, Deserialize)]
51#[serde(rename_all = "camelCase")]
52pub struct CommitIdentity {
53 pub name: String,
55 #[serde(skip_serializing_if = "Option::is_none")]
57 pub email: Option<String>,
58}
59
60#[derive(Clone, Debug, Serialize, Deserialize)]
62#[serde(rename_all = "camelCase")]
63pub struct CommitEntry {
64 pub oid: String,
66 pub parents: Vec<String>,
68 pub summary: String,
70 pub message: String,
72 pub author: CommitIdentity,
74 #[serde(skip_serializing_if = "Option::is_none")]
76 pub committer: Option<CommitIdentity>,
77 pub timestamp: u64,
79 #[serde(skip_serializing_if = "Option::is_none")]
81 pub tree_oid: Option<String>,
82}
83
84#[derive(Clone, Debug, Serialize, Deserialize)]
86#[serde(rename_all = "camelCase")]
87pub struct ListCommitsResult {
88 pub commits: Vec<CommitEntry>,
90 pub count: u64,
92 #[serde(skip_serializing_if = "Option::is_none")]
94 pub start: Option<String>,
95}
96
97#[derive(Clone, Debug, Serialize, Deserialize)]
99#[serde(rename_all = "camelCase")]
100pub struct FileDiff {
101 pub path: String,
103 #[serde(skip_serializing_if = "Option::is_none")]
105 pub old_path: Option<String>,
106 pub status: String,
108 #[serde(skip_serializing_if = "Option::is_none")]
110 pub old_oid: Option<String>,
111 #[serde(skip_serializing_if = "Option::is_none")]
113 pub new_oid: Option<String>,
114 pub additions: u64,
116 pub deletions: u64,
118 pub binary: bool,
120 pub hunks: Vec<serde_json::Value>,
122 #[serde(skip_serializing_if = "Option::is_none")]
124 pub structural_diff: Option<serde_json::Value>,
125}
126
127#[derive(Clone, Debug, Serialize, Deserialize)]
129#[serde(rename_all = "camelCase")]
130pub struct DiffCommitsResult {
131 pub from: String,
133 pub to: String,
135 pub files: Vec<FileDiff>,
137 pub total_additions: u64,
139 pub total_deletions: u64,
141 pub file_count: u64,
143}
144
145impl NodeClient {
146 #[must_use]
148 pub fn new(base_url: &str, did: &str, repo: &str) -> Self {
149 Self {
150 base_url: base_url.trim_end_matches('/').to_owned(),
151 did: did.to_owned(),
152 repo: repo.to_owned(),
153 token: None,
154 http: Client::new(),
155 }
156 }
157
158 #[must_use]
160 pub fn with_token(mut self, token: &str) -> Self {
161 self.token = Some(token.to_owned());
162 self
163 }
164
165 pub fn from_url(url: &str) -> Result<Self, XrpcError> {
176 let path = url
177 .strip_prefix("panproto://")
178 .or_else(|| url.strip_prefix("cospan://"))
179 .ok_or_else(|| {
180 XrpcError::InvalidUrl(format!("expected panproto:// or cospan:// prefix: {url}"))
181 })?;
182
183 let parts: Vec<&str> = path.splitn(2, '/').collect();
184 if parts.len() != 2 {
185 return Err(XrpcError::InvalidUrl(format!(
186 "expected panproto://did/repo: {url}"
187 )));
188 }
189
190 let base = std::env::var("PANPROTO_NODE_URL")
191 .or_else(|_| std::env::var("COSPAN_NODE_URL"))
192 .unwrap_or_else(|_| "https://node.panproto.dev".to_owned());
193
194 Ok(Self::new(&base, parts[0], parts[1]))
195 }
196
197 pub async fn get_object(&self, id: &ObjectId) -> Result<Object, XrpcError> {
205 let url = format!(
206 "{}/xrpc/dev.panproto.node.getObject?did={}&repo={}&id={}",
207 self.base_url, self.did, self.repo, id
208 );
209 let resp = self.http.get(&url).send().await?;
210 let status = resp.status();
211 if !status.is_success() {
212 let body = resp.text().await.unwrap_or_default();
213 return Err(XrpcError::NodeError {
214 endpoint: "getObject".to_owned(),
215 status: status.as_u16(),
216 body,
217 });
218 }
219 let bytes = resp.bytes().await?;
220 let obj: Object = rmp_serde::from_slice(&bytes)?;
221 Ok(obj)
222 }
223
224 pub async fn get_ref(&self, ref_name: &str) -> Result<Option<ObjectId>, XrpcError> {
230 let url = format!(
231 "{}/xrpc/dev.panproto.node.getRef?did={}&repo={}&ref={}",
232 self.base_url, self.did, self.repo, ref_name
233 );
234 let resp = self.http.get(&url).send().await?;
235 let status = resp.status();
236 if status.as_u16() == 404 {
237 return Ok(None);
238 }
239 if !status.is_success() {
240 let body = resp.text().await.unwrap_or_default();
241 return Err(XrpcError::NodeError {
242 endpoint: "getRef".to_owned(),
243 status: status.as_u16(),
244 body,
245 });
246 }
247 let body: serde_json::Value = resp.json().await?;
248 let id_str = body["target"]
249 .as_str()
250 .ok_or_else(|| XrpcError::NodeError {
251 endpoint: "getRef".to_owned(),
252 status: 200,
253 body: "missing target field".to_owned(),
254 })?;
255 Ok(Some(parse_object_id(id_str)?))
256 }
257
258 pub async fn list_refs(&self) -> Result<Vec<(String, ObjectId)>, XrpcError> {
264 let url = format!(
265 "{}/xrpc/dev.panproto.node.listRefs?did={}&repo={}",
266 self.base_url, self.did, self.repo
267 );
268 let resp = self.http.get(&url).send().await?;
269 let resp = check_response(resp, "listRefs").await?;
270 let body: serde_json::Value = resp.json().await?;
271 let refs = body["refs"]
272 .as_array()
273 .ok_or_else(|| XrpcError::NodeError {
274 endpoint: "listRefs".to_owned(),
275 status: 200,
276 body: "missing refs array".to_owned(),
277 })?;
278 let mut result = Vec::new();
279 for (i, r) in refs.iter().enumerate() {
280 let name = r["name"].as_str().ok_or_else(|| XrpcError::NodeError {
281 endpoint: "listRefs".to_owned(),
282 status: 200,
283 body: format!("ref entry {i} missing 'name' field"),
284 })?;
285 let target = r["target"].as_str().ok_or_else(|| XrpcError::NodeError {
286 endpoint: "listRefs".to_owned(),
287 status: 200,
288 body: format!("ref entry {i} ('{name}') missing 'target' field"),
289 })?;
290 result.push((name.to_owned(), parse_object_id(target)?));
291 }
292 Ok(result)
293 }
294
295 pub async fn get_head(&self) -> Result<HeadState, XrpcError> {
301 let url = format!(
302 "{}/xrpc/dev.panproto.node.getHead?did={}&repo={}",
303 self.base_url, self.did, self.repo
304 );
305 let resp = self.http.get(&url).send().await?;
306 let resp = check_response(resp, "getHead").await?;
307 let body: serde_json::Value = resp.json().await?;
308 if let Some(branch) = body["branch"].as_str() {
309 Ok(HeadState::Branch(branch.to_owned()))
310 } else if let Some(id_str) = body["detached"].as_str() {
311 Ok(HeadState::Detached(parse_object_id(id_str)?))
312 } else {
313 Err(XrpcError::NodeError {
314 endpoint: "getHead".to_owned(),
315 status: 200,
316 body: format!(
317 "unexpected HEAD response: neither 'branch' nor 'detached' field present: {body}"
318 ),
319 })
320 }
321 }
322
323 pub async fn get_repo_info(&self) -> Result<RepoInfo, XrpcError> {
329 let url = format!(
330 "{}/xrpc/dev.panproto.node.getRepoInfo?did={}&repo={}",
331 self.base_url, self.did, self.repo
332 );
333 let resp = self.http.get(&url).send().await?;
334 let resp = check_response(resp, "getRepoInfo").await?;
335 let info: RepoInfo = resp.json().await?;
336 Ok(info)
337 }
338
339 pub async fn list_commits(
348 &self,
349 git_ref: Option<&str>,
350 limit: Option<u32>,
351 ) -> Result<ListCommitsResult, XrpcError> {
352 let url = build_list_commits_url(&self.base_url, &self.did, &self.repo, git_ref, limit);
353 let resp = self.http.get(&url).send().await?;
354 let resp = check_response(resp, "listCommits").await?;
355 let result: ListCommitsResult = resp.json().await?;
356 Ok(result)
357 }
358
359 pub async fn diff_commits(
369 &self,
370 from: &str,
371 to: &str,
372 context_lines: Option<u32>,
373 ) -> Result<DiffCommitsResult, XrpcError> {
374 let url = build_diff_commits_url(
375 &self.base_url,
376 &self.did,
377 &self.repo,
378 from,
379 to,
380 context_lines,
381 );
382 let resp = self.http.get(&url).send().await?;
383 let resp = check_response(resp, "diffCommits").await?;
384 let result: DiffCommitsResult = resp.json().await?;
385 Ok(result)
386 }
387
388 pub async fn put_object(&self, object: &Object) -> Result<ObjectId, XrpcError> {
397 let token = self
398 .token
399 .as_ref()
400 .ok_or_else(|| XrpcError::AuthRequired("putObject requires auth".to_owned()))?;
401
402 let url = format!(
403 "{}/xrpc/dev.panproto.node.putObject?did={}&repo={}",
404 self.base_url, self.did, self.repo
405 );
406 let body = rmp_serde::to_vec(object)?;
407 let resp = self
408 .http
409 .post(&url)
410 .header("Authorization", format!("Bearer {token}"))
411 .header("Content-Type", "application/msgpack")
412 .body(body)
413 .send()
414 .await?;
415 check_status_owned(resp, "putObject").await
416 }
417
418 pub async fn set_ref(
424 &self,
425 ref_name: &str,
426 old_target: Option<&ObjectId>,
427 new_target: &ObjectId,
428 protocol: &str,
429 commit_count: u64,
430 ) -> Result<(), XrpcError> {
431 let token = self
432 .token
433 .as_ref()
434 .ok_or_else(|| XrpcError::AuthRequired("setRef requires auth".to_owned()))?;
435
436 let url = format!("{}/xrpc/dev.panproto.node.setRef", self.base_url);
437 let body = serde_json::json!({
438 "did": self.did,
439 "repo": self.repo,
440 "ref": ref_name,
441 "oldTarget": old_target.map(ToString::to_string),
442 "newTarget": new_target.to_string(),
443 "protocol": protocol,
444 "commitCount": commit_count,
445 });
446 let resp = self
447 .http
448 .post(&url)
449 .header("Authorization", format!("Bearer {token}"))
450 .json(&body)
451 .send()
452 .await?;
453 if !resp.status().is_success() {
454 let status = resp.status().as_u16();
455 let body = resp.text().await.unwrap_or_default();
456 return Err(XrpcError::NodeError {
457 endpoint: "setRef".to_owned(),
458 status,
459 body,
460 });
461 }
462 Ok(())
463 }
464
465 pub async fn negotiate(
474 &self,
475 have: &[ObjectId],
476 want: &[String],
477 ) -> Result<NegotiateResult, XrpcError> {
478 let url = format!("{}/xrpc/dev.panproto.node.negotiate", self.base_url);
479 let body = serde_json::json!({
480 "did": self.did,
481 "repo": self.repo,
482 "have": have.iter().map(ObjectId::to_string).collect::<Vec<_>>(),
483 "want": want,
484 });
485 let mut req = self.http.post(&url).json(&body);
486 if let Some(token) = &self.token {
487 req = req.header("Authorization", format!("Bearer {token}"));
488 }
489 let resp = req.send().await?;
490 if !resp.status().is_success() {
491 let status = resp.status().as_u16();
492 let body = resp.text().await.unwrap_or_default();
493 return Err(XrpcError::NodeError {
494 endpoint: "negotiate".to_owned(),
495 status,
496 body,
497 });
498 }
499 let result: NegotiateResult = resp.json().await?;
500 Ok(result)
501 }
502
503 pub async fn push<S: Store>(&self, store: &S) -> Result<PushResult, XrpcError> {
514 let local_refs = store.list_refs("refs/")?;
516 if local_refs.is_empty() {
517 return Ok(PushResult {
518 objects_pushed: 0,
519 refs_updated: 0,
520 });
521 }
522
523 let local_ids: Vec<ObjectId> = store.list_objects()?.into_iter().collect();
525 let want_refs: Vec<String> = local_refs.iter().map(|(name, _)| name.clone()).collect();
526
527 let negotiation = self.negotiate(&local_ids, &want_refs).await?;
529
530 let mut objects_pushed = 0;
532 for id_str in &negotiation.need {
533 let id = parse_object_id(id_str)?;
534 let obj = store.get(&id)?;
535 self.put_object(&obj).await?;
536 objects_pushed += 1;
537 }
538
539 let mut refs_updated = 0;
541 for (name, id) in &local_refs {
542 let remote_target = self.get_ref(name).await?;
543
544 let (protocol, commit_count) = match store.get(id) {
546 Ok(Object::Commit(c)) => {
547 let count = count_ancestors(store, id);
548 (c.protocol.clone(), count)
549 }
550 _ => ("project".to_owned(), 1),
551 };
552
553 self.set_ref(name, remote_target.as_ref(), id, &protocol, commit_count)
554 .await?;
555 refs_updated += 1;
556 }
557
558 Ok(PushResult {
559 objects_pushed,
560 refs_updated,
561 })
562 }
563
564 pub async fn pull<S: Store>(&self, store: &mut S) -> Result<PullResult, XrpcError> {
573 let remote_refs = self.list_refs().await?;
575 if remote_refs.is_empty() {
576 return Ok(PullResult {
577 objects_fetched: 0,
578 refs_updated: 0,
579 });
580 }
581
582 let local_ids: Vec<ObjectId> = store.list_objects()?.into_iter().collect();
584 let want_refs: Vec<String> = remote_refs.iter().map(|(name, _)| name.clone()).collect();
585
586 let negotiation = self.negotiate(&local_ids, &want_refs).await?;
588
589 let mut objects_fetched = 0;
591 for id_str in &negotiation.need {
592 let id = parse_object_id(id_str)?;
593 let obj = self.get_object(&id).await?;
594 store.put(&obj)?;
595 objects_fetched += 1;
596 }
597
598 let mut refs_updated = 0;
600 for (name, id) in &remote_refs {
601 store.set_ref(name, *id)?;
602 refs_updated += 1;
603 }
604
605 Ok(PullResult {
606 objects_fetched,
607 refs_updated,
608 })
609 }
610}
611
612#[derive(Debug)]
614pub struct PushResult {
615 pub objects_pushed: usize,
617 pub refs_updated: usize,
619}
620
621#[derive(Debug)]
623pub struct PullResult {
624 pub objects_fetched: usize,
626 pub refs_updated: usize,
628}
629
630fn count_ancestors<S: Store>(store: &S, start: &ObjectId) -> u64 {
634 let mut count = 0;
635 let mut stack = vec![*start];
636 let mut visited = std::collections::HashSet::new();
637 while let Some(id) = stack.pop() {
638 if !visited.insert(id) {
639 continue;
640 }
641 count += 1;
642 if let Ok(Object::Commit(c)) = store.get(&id) {
643 stack.extend_from_slice(&c.parents);
644 }
645 }
646 count
647}
648
649fn build_list_commits_url(
654 base_url: &str,
655 did: &str,
656 repo: &str,
657 git_ref: Option<&str>,
658 limit: Option<u32>,
659) -> String {
660 let mut url = format!("{base_url}/xrpc/dev.panproto.node.listCommits?did={did}&repo={repo}");
661 if let Some(r) = git_ref {
662 let _ = write!(url, "&ref={r}");
663 }
664 if let Some(n) = limit {
665 let _ = write!(url, "&limit={n}");
666 }
667 url
668}
669
670fn build_diff_commits_url(
672 base_url: &str,
673 did: &str,
674 repo: &str,
675 from: &str,
676 to: &str,
677 context_lines: Option<u32>,
678) -> String {
679 let mut url = format!(
680 "{base_url}/xrpc/dev.panproto.node.diffCommits?did={did}&repo={repo}&from={from}&to={to}"
681 );
682 if let Some(ctx) = context_lines {
683 let _ = write!(url, "&contextLines={ctx}");
684 }
685 url
686}
687
688fn parse_object_id(hex: &str) -> Result<ObjectId, XrpcError> {
690 let bytes =
691 hex::decode(hex).map_err(|e| XrpcError::InvalidUrl(format!("bad object ID: {e}")))?;
692 if bytes.len() != 32 {
693 return Err(XrpcError::InvalidUrl(format!(
694 "object ID must be 32 bytes, got {}",
695 bytes.len()
696 )));
697 }
698 let mut arr = [0u8; 32];
699 arr.copy_from_slice(&bytes);
700 Ok(ObjectId::from_bytes(arr))
701}
702
703async fn check_response(
705 resp: reqwest::Response,
706 endpoint: &str,
707) -> Result<reqwest::Response, XrpcError> {
708 if resp.status().is_success() {
709 return Ok(resp);
710 }
711 let status = resp.status().as_u16();
712 let body = resp.text().await.unwrap_or_default();
713 Err(XrpcError::NodeError {
714 endpoint: endpoint.to_owned(),
715 status,
716 body,
717 })
718}
719
720async fn check_status_owned(
722 resp: reqwest::Response,
723 endpoint: &str,
724) -> Result<ObjectId, XrpcError> {
725 if !resp.status().is_success() {
726 let status = resp.status().as_u16();
727 let body = resp.text().await.unwrap_or_default();
728 return Err(XrpcError::NodeError {
729 endpoint: endpoint.to_owned(),
730 status,
731 body,
732 });
733 }
734 let body: serde_json::Value = resp.json().await?;
735 let id_str = body["id"].as_str().ok_or_else(|| XrpcError::NodeError {
736 endpoint: endpoint.to_owned(),
737 status: 200,
738 body: "missing id field in putObject response".to_owned(),
739 })?;
740 parse_object_id(id_str)
741}
742
743#[cfg(test)]
744mod tests {
745 use super::*;
746
747 #[test]
748 fn list_commits_result_camel_case_roundtrip() -> Result<(), serde_json::Error> {
749 let result = ListCommitsResult {
750 commits: vec![CommitEntry {
751 oid: "abc123".to_owned(),
752 parents: vec!["def456".to_owned()],
753 summary: "initial commit".to_owned(),
754 message: "initial commit\n\nwith body".to_owned(),
755 author: CommitIdentity {
756 name: "Alice".to_owned(),
757 email: Some("alice@example.com".to_owned()),
758 },
759 committer: None,
760 timestamp: 1_712_345_678,
761 tree_oid: Some("fff000".to_owned()),
762 }],
763 count: 1,
764 start: Some("abc123".to_owned()),
765 };
766 let json = serde_json::to_value(&result)?;
767 assert!(json["commits"][0]["treeOid"].is_string());
768 assert!(json["commits"][0]["tree_oid"].is_null());
769 let roundtrip: ListCommitsResult = serde_json::from_value(json)?;
770 assert_eq!(roundtrip.commits[0].oid, "abc123");
771 assert_eq!(roundtrip.commits[0].tree_oid.as_deref(), Some("fff000"));
772 Ok(())
773 }
774
775 #[test]
776 fn diff_commits_result_camel_case_roundtrip() -> Result<(), serde_json::Error> {
777 let result = DiffCommitsResult {
778 from: "aaa".to_owned(),
779 to: "bbb".to_owned(),
780 files: vec![FileDiff {
781 path: "schemas/core.json".to_owned(),
782 old_path: None,
783 status: "added".to_owned(),
784 old_oid: None,
785 new_oid: Some("ccc".to_owned()),
786 additions: 12,
787 deletions: 0,
788 binary: false,
789 hunks: vec![],
790 structural_diff: Some(serde_json::json!({"added_vertices": ["Foo"]})),
791 }],
792 total_additions: 12,
793 total_deletions: 0,
794 file_count: 1,
795 };
796 let json = serde_json::to_value(&result)?;
797 assert!(json["totalAdditions"].is_number());
798 assert!(json["total_additions"].is_null());
799 assert!(json["files"][0]["oldPath"].is_null());
800 assert!(json["files"][0]["structuralDiff"].is_object());
801 let roundtrip: DiffCommitsResult = serde_json::from_value(json)?;
802 assert_eq!(roundtrip.total_additions, 12);
803 assert_eq!(roundtrip.files[0].path, "schemas/core.json");
804 Ok(())
805 }
806
807 #[test]
810 fn list_commits_url_minimal_required_params_only() {
811 let url = build_list_commits_url(
812 "https://node.example.com",
813 "did:plc:abc",
814 "myrepo",
815 None,
816 None,
817 );
818 assert_eq!(
819 url,
820 "https://node.example.com/xrpc/dev.panproto.node.listCommits?did=did:plc:abc&repo=myrepo"
821 );
822 }
823
824 #[test]
825 fn list_commits_url_with_ref_only() {
826 let url = build_list_commits_url(
827 "https://node.example.com",
828 "did:plc:abc",
829 "myrepo",
830 Some("refs/heads/main"),
831 None,
832 );
833 assert!(url.ends_with("&ref=refs/heads/main"));
834 assert!(!url.contains("&limit="));
835 }
836
837 #[test]
838 fn list_commits_url_with_limit_only() {
839 let url = build_list_commits_url(
840 "https://node.example.com",
841 "did:plc:abc",
842 "myrepo",
843 None,
844 Some(100),
845 );
846 assert!(url.ends_with("&limit=100"));
847 assert!(!url.contains("&ref="));
848 }
849
850 #[test]
851 fn list_commits_url_with_ref_and_limit() {
852 let url = build_list_commits_url(
853 "https://node.example.com",
854 "did:plc:abc",
855 "myrepo",
856 Some("feature"),
857 Some(25),
858 );
859 assert_eq!(
860 url,
861 "https://node.example.com/xrpc/dev.panproto.node.listCommits?did=did:plc:abc&repo=myrepo&ref=feature&limit=25"
862 );
863 }
864
865 #[test]
866 fn diff_commits_url_without_context_lines() {
867 let url = build_diff_commits_url(
868 "https://node.example.com",
869 "did:plc:abc",
870 "myrepo",
871 "deadbeef",
872 "cafef00d",
873 None,
874 );
875 assert_eq!(
876 url,
877 "https://node.example.com/xrpc/dev.panproto.node.diffCommits?did=did:plc:abc&repo=myrepo&from=deadbeef&to=cafef00d"
878 );
879 }
880
881 #[test]
882 fn diff_commits_url_with_context_lines() {
883 let url = build_diff_commits_url(
884 "https://node.example.com",
885 "did:plc:abc",
886 "myrepo",
887 "deadbeef",
888 "cafef00d",
889 Some(5),
890 );
891 assert!(url.ends_with("&contextLines=5"));
892 }
893
894 #[test]
895 fn list_commits_url_strips_trailing_slash_from_base() {
896 let url =
900 build_list_commits_url("https://node.example.com", "did:plc:abc", "r", None, None);
901 let Some(after_scheme) = url.strip_prefix("https://") else {
904 panic!("url should start with https://: {url}");
905 };
906 assert!(
907 !after_scheme.contains("//"),
908 "url should not contain consecutive slashes: {url}"
909 );
910 }
911}