1mod identity;
47mod repo_metadata;
48
49use crate::runtime::block_on_result;
50use anyhow::{Context, Result};
51use futures::{SinkExt, StreamExt};
52use hashtree_blossom::BlossomClient;
53use hashtree_core::{decode_tree_node, decrypt_chk, LinkType};
54use nostr_sdk::prelude::*;
55use serde::Deserialize;
56use std::collections::HashMap;
57use std::time::Duration;
58use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage};
59use tracing::{debug, info, warn};
60
61#[cfg(test)]
62use identity::resolve_self_identity;
63pub use identity::{load_keys, resolve_identity, StoredKey};
64#[cfg(test)]
65use repo_metadata::pick_latest_event;
66use repo_metadata::{
67 append_repo_discovery_labels, build_git_repo_list_filter, build_repo_event_filter,
68 latest_repo_event_created_at, latest_trusted_pr_status_kinds, list_git_repo_announcements,
69 next_replaceable_created_at, pick_latest_repo_event, validate_repo_publish_relays,
70};
71
72pub const KIND_APP_DATA: u16 = 30078;
74
75pub const KIND_PULL_REQUEST: u16 = 1618;
77pub const KIND_STATUS_OPEN: u16 = 1630;
78pub const KIND_STATUS_APPLIED: u16 = 1631;
79pub const KIND_STATUS_CLOSED: u16 = 1632;
80pub const KIND_STATUS_DRAFT: u16 = 1633;
81pub const KIND_REPO_ANNOUNCEMENT: u16 = 30617;
82
83pub const LABEL_HASHTREE: &str = "hashtree";
85pub const LABEL_GIT: &str = "git";
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum PullRequestState {
90 Open,
91 Applied,
92 Closed,
93 Draft,
94}
95
96impl PullRequestState {
97 pub fn as_str(self) -> &'static str {
98 match self {
99 PullRequestState::Open => "open",
100 PullRequestState::Applied => "applied",
101 PullRequestState::Closed => "closed",
102 PullRequestState::Draft => "draft",
103 }
104 }
105
106 fn from_status_kind(status_kind: u16) -> Option<Self> {
107 match status_kind {
108 KIND_STATUS_OPEN => Some(PullRequestState::Open),
109 KIND_STATUS_APPLIED => Some(PullRequestState::Applied),
110 KIND_STATUS_CLOSED => Some(PullRequestState::Closed),
111 KIND_STATUS_DRAFT => Some(PullRequestState::Draft),
112 _ => None,
113 }
114 }
115
116 fn from_latest_status_kind(status_kind: Option<u16>) -> Self {
117 status_kind
118 .and_then(Self::from_status_kind)
119 .unwrap_or(PullRequestState::Open)
120 }
121}
122
123#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
125pub enum PullRequestStateFilter {
126 #[default]
127 Open,
128 Applied,
129 Closed,
130 Draft,
131 All,
132}
133
134impl PullRequestStateFilter {
135 pub fn as_str(self) -> &'static str {
136 match self {
137 PullRequestStateFilter::Open => "open",
138 PullRequestStateFilter::Applied => "applied",
139 PullRequestStateFilter::Closed => "closed",
140 PullRequestStateFilter::Draft => "draft",
141 PullRequestStateFilter::All => "all",
142 }
143 }
144
145 fn includes(self, state: PullRequestState) -> bool {
146 match self {
147 PullRequestStateFilter::All => true,
148 PullRequestStateFilter::Open => state == PullRequestState::Open,
149 PullRequestStateFilter::Applied => state == PullRequestState::Applied,
150 PullRequestStateFilter::Closed => state == PullRequestState::Closed,
151 PullRequestStateFilter::Draft => state == PullRequestState::Draft,
152 }
153 }
154}
155
156#[derive(Debug, Clone)]
158pub struct PullRequestListItem {
159 pub event_id: String,
160 pub author_pubkey: String,
161 pub state: PullRequestState,
162 pub subject: Option<String>,
163 pub commit_tip: Option<String>,
164 pub branch: Option<String>,
165 pub target_branch: Option<String>,
166 pub created_at: u64,
167}
168
169async fn fetch_events_via_raw_relay_query(
170 relays: &[String],
171 filter: Filter,
172 timeout: Duration,
173) -> Vec<Event> {
174 let request_json = ClientMessage::req(SubscriptionId::generate(), vec![filter]).as_json();
175 let mut events_by_id = HashMap::<String, Event>::new();
176
177 for relay_url in relays {
178 let relay_events = match tokio::time::timeout(timeout, async {
179 let (mut ws, _) = connect_async(relay_url).await?;
180 ws.send(WsMessage::Text(request_json.clone())).await?;
181
182 let mut relay_events = Vec::new();
183 while let Some(message) = ws.next().await {
184 let message = message?;
185 let WsMessage::Text(text) = message else {
186 continue;
187 };
188
189 match RelayMessage::from_json(text.as_str()) {
190 Ok(RelayMessage::Event { event, .. }) => relay_events.push(*event),
191 Ok(RelayMessage::EndOfStoredEvents(_)) => break,
192 Ok(RelayMessage::Closed { message, .. }) => {
193 debug!("Raw relay PR query closed by {}: {}", relay_url, message);
194 break;
195 }
196 Ok(_) => {}
197 Err(err) => {
198 debug!(
199 "Failed to parse raw relay response from {}: {}",
200 relay_url, err
201 );
202 }
203 }
204 }
205
206 let _ = ws.close(None).await;
207 Ok::<Vec<Event>, anyhow::Error>(relay_events)
208 })
209 .await
210 {
211 Ok(Ok(events)) => events,
212 Ok(Err(err)) => {
213 debug!("Raw relay PR query failed for {}: {}", relay_url, err);
214 continue;
215 }
216 Err(_) => {
217 debug!("Raw relay PR query timed out for {}", relay_url);
218 continue;
219 }
220 };
221
222 for event in relay_events {
223 events_by_id.insert(event.id.to_hex(), event);
224 }
225 }
226
227 events_by_id.into_values().collect()
228}
229
230async fn connected_relay_count(client: &Client) -> (usize, usize) {
231 let relays = client.relays().await;
232 let total = relays.len();
233 let mut connected = 0;
234 for relay in relays.values() {
235 if relay.is_connected().await {
236 connected += 1;
237 }
238 }
239 (connected, total)
240}
241
242async fn wait_for_any_connected_relay(client: &Client, timeout: Duration) -> bool {
243 let start = std::time::Instant::now();
244 loop {
245 if connected_relay_count(client).await.0 > 0 {
246 return true;
247 }
248 if start.elapsed() > timeout {
249 return false;
250 }
251 tokio::time::sleep(Duration::from_millis(50)).await;
252 }
253}
254
255type FetchedRefs = (HashMap<String, String>, Option<String>, Option<[u8; 32]>);
256use hashtree_config::Config;
257
258#[derive(Debug, Clone)]
260pub struct RelayResult {
261 #[allow(dead_code)]
263 pub configured: Vec<String>,
264 pub connected: Vec<String>,
266 pub failed: Vec<String>,
268}
269
270#[derive(Debug, Clone)]
272pub struct BlossomResult {
273 #[allow(dead_code)]
275 pub configured: Vec<String>,
276 pub succeeded: Vec<String>,
278 pub failed: Vec<String>,
280}
281
282pub struct NostrClient {
284 pubkey: String,
285 keys: Option<Keys>,
287 relays: Vec<String>,
288 blossom: BlossomClient,
289 cached_refs: HashMap<String, HashMap<String, String>>,
291 cached_root_hash: HashMap<String, String>,
293 cached_encryption_key: HashMap<String, [u8; 32]>,
295 url_secret: Option<[u8; 32]>,
298 is_private: bool,
300 local_daemon_url: Option<String>,
302}
303
304#[derive(Debug, Clone, Default)]
305struct RootEventData {
306 root_hash: String,
307 encryption_key: Option<[u8; 32]>,
308 key_tag_name: Option<String>,
309 self_encrypted_ciphertext: Option<String>,
310}
311
312#[derive(Debug, Deserialize)]
313struct DaemonResolveResponse {
314 hash: Option<String>,
315 #[serde(default, rename = "key_tag")]
316 key: Option<String>,
317 #[serde(default, rename = "encryptedKey")]
318 encrypted_key: Option<String>,
319 #[serde(default, rename = "selfEncryptedKey")]
320 self_encrypted_key: Option<String>,
321 #[serde(default)]
322 source: Option<String>,
323}
324
325impl NostrClient {
326 pub fn new(
328 pubkey: &str,
329 secret_key: Option<String>,
330 url_secret: Option<[u8; 32]>,
331 is_private: bool,
332 config: &Config,
333 ) -> Result<Self> {
334 let _ = rustls::crypto::ring::default_provider().install_default();
336
337 let secret_key = secret_key.or_else(|| std::env::var("NOSTR_SECRET_KEY").ok());
339
340 let keys = if let Some(ref secret_hex) = secret_key {
342 let secret_bytes = hex::decode(secret_hex).context("Invalid secret key hex")?;
343 let secret = nostr::SecretKey::from_slice(&secret_bytes)
344 .map_err(|e| anyhow::anyhow!("Invalid secret key: {}", e))?;
345 Some(Keys::new(secret))
346 } else {
347 None
348 };
349
350 let blossom_keys = keys.clone().unwrap_or_else(Keys::generate);
353 let blossom = BlossomClient::new(blossom_keys).with_timeout(Duration::from_secs(30));
354
355 tracing::info!(
356 "BlossomClient created with read_servers: {:?}, write_servers: {:?}",
357 blossom.read_servers(),
358 blossom.write_servers()
359 );
360
361 let relays = hashtree_config::resolve_relays(
362 &config.nostr.relays,
363 Some(config.server.bind_address.as_str()),
364 );
365 let local_daemon_url =
366 hashtree_config::detect_local_daemon_url(Some(config.server.bind_address.as_str()))
367 .or_else(|| {
368 config
369 .blossom
370 .read_servers
371 .iter()
372 .find(|url| {
373 url.starts_with("http://127.0.0.1:")
374 || url.starts_with("http://localhost:")
375 })
376 .cloned()
377 });
378
379 Ok(Self {
380 pubkey: pubkey.to_string(),
381 keys,
382 relays,
383 blossom,
384 cached_refs: HashMap::new(),
385 cached_root_hash: HashMap::new(),
386 cached_encryption_key: HashMap::new(),
387 url_secret,
388 is_private,
389 local_daemon_url,
390 })
391 }
392
393 fn format_repo_author(pubkey_hex: &str) -> String {
394 PublicKey::from_hex(pubkey_hex)
395 .ok()
396 .and_then(|pk| pk.to_bech32().ok())
397 .unwrap_or_else(|| pubkey_hex.to_string())
398 }
399
400 #[allow(dead_code)]
402 pub fn can_sign(&self) -> bool {
403 self.keys.is_some()
404 }
405
406 pub fn list_repos(&self) -> Result<Vec<String>> {
407 block_on_result(self.list_repos_async())
408 }
409
410 pub async fn list_repos_async(&self) -> Result<Vec<String>> {
411 let client = Client::default();
412
413 for relay in &self.relays {
414 if let Err(e) = client.add_relay(relay).await {
415 warn!("Failed to add relay {}: {}", relay, e);
416 }
417 }
418 client.connect().await;
419
420 if !wait_for_any_connected_relay(&client, Duration::from_secs(2)).await {
421 let _ = client.disconnect().await;
422 return Err(anyhow::anyhow!(
423 "Failed to connect to any relay while listing repos"
424 ));
425 }
426
427 let author = PublicKey::from_hex(&self.pubkey)
428 .map_err(|e| anyhow::anyhow!("Invalid pubkey: {}", e))?;
429 let filter = build_git_repo_list_filter(author);
430
431 let events = match tokio::time::timeout(
432 Duration::from_secs(3),
433 client.get_events_of(vec![filter], EventSource::relays(None)),
434 )
435 .await
436 {
437 Ok(Ok(events)) => events,
438 Ok(Err(e)) => {
439 let _ = client.disconnect().await;
440 return Err(anyhow::anyhow!(
441 "Failed to fetch git repo events from relays: {}",
442 e
443 ));
444 }
445 Err(_) => {
446 let _ = client.disconnect().await;
447 return Err(anyhow::anyhow!(
448 "Timed out fetching git repo events from relays"
449 ));
450 }
451 };
452
453 let _ = client.disconnect().await;
454
455 Ok(list_git_repo_announcements(&events)
456 .into_iter()
457 .map(|repo| repo.repo_name)
458 .collect())
459 }
460
461 pub fn fetch_refs(&mut self, repo_name: &str) -> Result<HashMap<String, String>> {
464 let (refs, _, _) = self.fetch_refs_with_timeout(repo_name, 10)?;
465 Ok(refs)
466 }
467
468 #[allow(dead_code)]
471 pub fn fetch_refs_quick(&mut self, repo_name: &str) -> Result<HashMap<String, String>> {
472 let (refs, _, _) = self.fetch_refs_with_timeout(repo_name, 3)?;
473 Ok(refs)
474 }
475
476 #[allow(dead_code)]
479 pub fn fetch_refs_with_root(&mut self, repo_name: &str) -> Result<FetchedRefs> {
480 self.fetch_refs_with_timeout(repo_name, 10)
481 }
482
483 fn fetch_refs_with_timeout(
485 &mut self,
486 repo_name: &str,
487 timeout_secs: u64,
488 ) -> Result<FetchedRefs> {
489 debug!(
490 "Fetching refs for {} from {} (timeout {}s)",
491 repo_name, self.pubkey, timeout_secs
492 );
493
494 if let Some(refs) = self.cached_refs.get(repo_name) {
496 let root = self.cached_root_hash.get(repo_name).cloned();
497 let key = self.cached_encryption_key.get(repo_name).cloned();
498 return Ok((refs.clone(), root, key));
499 }
500
501 let (refs, root_hash, encryption_key) =
504 block_on_result(self.fetch_refs_async_with_timeout(repo_name, timeout_secs))?;
505 self.cached_refs.insert(repo_name.to_string(), refs.clone());
506 if let Some(ref root) = root_hash {
507 self.cached_root_hash
508 .insert(repo_name.to_string(), root.clone());
509 }
510 if let Some(key) = encryption_key {
511 self.cached_encryption_key
512 .insert(repo_name.to_string(), key);
513 }
514 Ok((refs, root_hash, encryption_key))
515 }
516
517 fn parse_root_event_data_from_event(event: &Event) -> RootEventData {
518 let root_hash = event
519 .tags
520 .iter()
521 .find(|t| t.as_slice().len() >= 2 && t.as_slice()[0].as_str() == "hash")
522 .map(|t| t.as_slice()[1].to_string())
523 .unwrap_or_else(|| event.content.to_string());
524
525 let (encryption_key, key_tag_name, self_encrypted_ciphertext) = event
526 .tags
527 .iter()
528 .find_map(|t| {
529 let slice = t.as_slice();
530 if slice.len() < 2 {
531 return None;
532 }
533 let tag_name = slice[0].as_str();
534 let tag_value = slice[1].to_string();
535 if tag_name == "selfEncryptedKey" {
536 return Some((None, Some(tag_name.to_string()), Some(tag_value)));
537 }
538 if tag_name == "key" || tag_name == "encryptedKey" {
539 if let Ok(bytes) = hex::decode(&tag_value) {
540 if bytes.len() == 32 {
541 let mut key = [0u8; 32];
542 key.copy_from_slice(&bytes);
543 return Some((Some(key), Some(tag_name.to_string()), None));
544 }
545 }
546 }
547 None
548 })
549 .unwrap_or((None, None, None));
550
551 RootEventData {
552 root_hash,
553 encryption_key,
554 key_tag_name,
555 self_encrypted_ciphertext,
556 }
557 }
558
559 fn parse_daemon_response_to_root_data(
560 response: DaemonResolveResponse,
561 ) -> Option<RootEventData> {
562 let root_hash = response.hash?;
563 if root_hash.is_empty() {
564 return None;
565 }
566
567 let mut data = RootEventData {
568 root_hash,
569 encryption_key: None,
570 key_tag_name: None,
571 self_encrypted_ciphertext: None,
572 };
573
574 if let Some(ciphertext) = response.self_encrypted_key {
575 data.key_tag_name = Some("selfEncryptedKey".to_string());
576 data.self_encrypted_ciphertext = Some(ciphertext);
577 return Some(data);
578 }
579
580 let (tag_name, tag_value) = if let Some(v) = response.encrypted_key {
581 ("encryptedKey", v)
582 } else if let Some(v) = response.key {
583 ("key", v)
584 } else {
585 return Some(data);
586 };
587
588 if let Ok(bytes) = hex::decode(&tag_value) {
589 if bytes.len() == 32 {
590 let mut key = [0u8; 32];
591 key.copy_from_slice(&bytes);
592 data.encryption_key = Some(key);
593 data.key_tag_name = Some(tag_name.to_string());
594 }
595 }
596
597 Some(data)
598 }
599
600 async fn fetch_root_from_local_daemon(
601 &self,
602 repo_name: &str,
603 timeout: Duration,
604 ) -> Option<RootEventData> {
605 let base = self.local_daemon_url.as_ref()?;
606 let url = format!(
607 "{}/api/nostr/resolve/{}/{}",
608 base.trim_end_matches('/'),
609 self.pubkey,
610 repo_name
611 );
612
613 let client = reqwest::Client::builder().timeout(timeout).build().ok()?;
614 let response = client.get(&url).send().await.ok()?;
615 if !response.status().is_success() {
616 return None;
617 }
618
619 let payload: DaemonResolveResponse = response.json().await.ok()?;
620 let source = payload
621 .source
622 .clone()
623 .unwrap_or_else(|| "unknown".to_string());
624 let parsed = Self::parse_daemon_response_to_root_data(payload)?;
625 debug!(
626 "Resolved repo {} via local daemon source={}",
627 repo_name, source
628 );
629 Some(parsed)
630 }
631
632 async fn fetch_refs_async_with_timeout(
633 &self,
634 repo_name: &str,
635 timeout_secs: u64,
636 ) -> Result<(HashMap<String, String>, Option<String>, Option<[u8; 32]>)> {
637 let client = Client::default();
639
640 for relay in &self.relays {
642 if let Err(e) = client.add_relay(relay).await {
643 warn!("Failed to add relay {}: {}", relay, e);
644 }
645 }
646
647 client.connect().await;
649
650 let connect_timeout = Duration::from_secs(2);
651 let query_timeout = Duration::from_secs(timeout_secs.saturating_sub(2).max(3));
652 let local_daemon_timeout = Duration::from_secs(4);
653 let retry_delay = Duration::from_millis(300);
654 let max_attempts = 2;
655
656 let start = std::time::Instant::now();
657
658 let author = PublicKey::from_hex(&self.pubkey)
660 .map_err(|e| anyhow::anyhow!("Invalid pubkey: {}", e))?;
661
662 let filter = build_repo_event_filter(author, repo_name);
663
664 debug!("Querying relays for repo {} events", repo_name);
665
666 let mut root_data = None;
667 for attempt in 1..=max_attempts {
668 let connect_start = std::time::Instant::now();
671 let mut last_log = std::time::Instant::now();
672 let mut has_connected_relay = false;
673 loop {
674 let (connected, total) = connected_relay_count(&client).await;
675 if connected > 0 {
676 debug!(
677 "Connected to {}/{} relay(s) in {:?} (attempt {}/{})",
678 connected,
679 total,
680 start.elapsed(),
681 attempt,
682 max_attempts
683 );
684 has_connected_relay = true;
685 break;
686 }
687 if last_log.elapsed() > Duration::from_millis(500) {
688 debug!(
689 "Connecting to relays... (0/{} after {:?}, attempt {}/{})",
690 total,
691 start.elapsed(),
692 attempt,
693 max_attempts
694 );
695 last_log = std::time::Instant::now();
696 }
697 if connect_start.elapsed() > connect_timeout {
698 debug!(
699 "Timeout waiting for relay connections - continuing with local-daemon fallback"
700 );
701 break;
702 }
703 tokio::time::sleep(Duration::from_millis(50)).await;
704 }
705
706 let events = if has_connected_relay {
710 match client
711 .get_events_of(
712 vec![filter.clone()],
713 EventSource::relays(Some(query_timeout)),
714 )
715 .await
716 {
717 Ok(events) => events,
718 Err(e) => {
719 warn!("Failed to fetch events: {}", e);
720 vec![]
721 }
722 }
723 } else {
724 vec![]
725 };
726
727 debug!(
728 "Got {} events from relays on attempt {}/{}",
729 events.len(),
730 attempt,
731 max_attempts
732 );
733 let relay_event = pick_latest_repo_event(events.iter(), repo_name);
734
735 if let Some(event) = relay_event {
736 debug!(
737 "Found relay event with root hash: {}",
738 &event.content[..12.min(event.content.len())]
739 );
740 root_data = Some(Self::parse_root_event_data_from_event(event));
741 break;
742 }
743
744 if let Some(data) = self
745 .fetch_root_from_local_daemon(repo_name, local_daemon_timeout)
746 .await
747 {
748 root_data = Some(data);
749 break;
750 }
751
752 if attempt < max_attempts {
753 debug!(
754 "No hashtree event found for {} on attempt {}/{}; retrying",
755 repo_name, attempt, max_attempts
756 );
757 tokio::time::sleep(retry_delay).await;
758 }
759 }
760
761 let _ = client.disconnect().await;
763
764 let root_data = match root_data {
765 Some(data) => data,
766 None => {
767 anyhow::bail!(
768 "Repository '{}' not found (no hashtree event published by {})",
769 repo_name,
770 Self::format_repo_author(&self.pubkey)
771 );
772 }
773 };
774
775 let root_hash = root_data.root_hash;
776
777 if root_hash.is_empty() {
778 debug!("Empty root hash in event");
779 return Ok((HashMap::new(), None, None));
780 }
781
782 let encryption_key = root_data.encryption_key;
783 let key_tag_name = root_data.key_tag_name;
784 let self_encrypted_ciphertext = root_data.self_encrypted_ciphertext;
785
786 let unmasked_key = match key_tag_name.as_deref() {
788 Some("encryptedKey") => {
789 if let (Some(masked), Some(secret)) = (encryption_key, self.url_secret) {
791 let mut unmasked = [0u8; 32];
792 for i in 0..32 {
793 unmasked[i] = masked[i] ^ secret[i];
794 }
795 Some(unmasked)
796 } else {
797 anyhow::bail!(
798 "This repo is link-visible and requires a secret key.\n\
799 Use: htree://.../{repo_name}#k=<secret>\n\
800 Ask the repo owner for the full URL with the secret."
801 );
802 }
803 }
804 Some("selfEncryptedKey") => {
805 if !self.is_private {
807 anyhow::bail!(
808 "This repo is private (author-only).\n\
809 Use: htree://.../{repo_name}#private\n\
810 Only the author can access this repo."
811 );
812 }
813
814 if let Some(keys) = &self.keys {
816 if let Some(ciphertext) = self_encrypted_ciphertext {
817 let pubkey = keys.public_key();
819 match nip44::decrypt(keys.secret_key(), &pubkey, &ciphertext) {
820 Ok(key_hex) => {
821 let key_bytes =
822 hex::decode(&key_hex).context("Invalid decrypted key hex")?;
823 if key_bytes.len() != 32 {
824 anyhow::bail!("Decrypted key wrong length");
825 }
826 let mut key = [0u8; 32];
827 key.copy_from_slice(&key_bytes);
828 Some(key)
829 }
830 Err(e) => {
831 anyhow::bail!(
832 "Failed to decrypt private repo: {}\n\
833 The repo may be corrupted or published with a different key.",
834 e
835 );
836 }
837 }
838 } else {
839 anyhow::bail!("selfEncryptedKey tag has invalid format");
840 }
841 } else {
842 anyhow::bail!(
843 "Cannot access this private repo.\n\
844 Private repos can only be accessed by their author.\n\
845 You don't have the secret key for this repo's owner."
846 );
847 }
848 }
849 Some("key") | None => {
850 encryption_key
852 }
853 Some(other) => {
854 warn!("Unknown key tag type: {}", other);
855 encryption_key
856 }
857 };
858
859 info!(
860 "Found root hash {} for {} (encrypted: {}, link_visible: {})",
861 &root_hash[..12.min(root_hash.len())],
862 repo_name,
863 unmasked_key.is_some(),
864 self.url_secret.is_some()
865 );
866
867 let refs = self
869 .fetch_refs_from_hashtree(&root_hash, unmasked_key.as_ref())
870 .await?;
871 Ok((refs, Some(root_hash), unmasked_key))
872 }
873
874 fn decrypt_and_decode(
876 &self,
877 data: &[u8],
878 key: Option<&[u8; 32]>,
879 ) -> Option<hashtree_core::TreeNode> {
880 let decrypted_data: Vec<u8>;
881 let data_to_decode = if let Some(k) = key {
882 match decrypt_chk(data, k) {
883 Ok(d) => {
884 decrypted_data = d;
885 &decrypted_data
886 }
887 Err(e) => {
888 debug!("Decryption failed: {}", e);
889 return None;
890 }
891 }
892 } else {
893 data
894 };
895
896 match decode_tree_node(data_to_decode) {
897 Ok(node) => Some(node),
898 Err(e) => {
899 debug!("Failed to decode tree node: {}", e);
900 None
901 }
902 }
903 }
904
905 async fn fetch_refs_from_hashtree(
908 &self,
909 root_hash: &str,
910 encryption_key: Option<&[u8; 32]>,
911 ) -> Result<HashMap<String, String>> {
912 let mut refs = HashMap::new();
913 debug!(
914 "fetch_refs_from_hashtree: downloading root {}",
915 &root_hash[..12]
916 );
917
918 let root_data = match self.blossom.download(root_hash).await {
920 Ok(data) => {
921 debug!("Downloaded {} bytes from blossom", data.len());
922 data
923 }
924 Err(e) => {
925 anyhow::bail!(
926 "Failed to download root hash {}: {}",
927 &root_hash[..12.min(root_hash.len())],
928 e
929 );
930 }
931 };
932
933 let root_node = match self.decrypt_and_decode(&root_data, encryption_key) {
935 Some(node) => {
936 debug!("Decoded root node with {} links", node.links.len());
937 node
938 }
939 None => {
940 debug!(
941 "Failed to decode root node (encryption_key: {})",
942 encryption_key.is_some()
943 );
944 return Ok(refs);
945 }
946 };
947
948 debug!(
950 "Root links: {:?}",
951 root_node
952 .links
953 .iter()
954 .map(|l| l.name.as_deref())
955 .collect::<Vec<_>>()
956 );
957 let git_link = root_node
958 .links
959 .iter()
960 .find(|l| l.name.as_deref() == Some(".git"));
961 let (git_hash, git_key) = match git_link {
962 Some(link) => {
963 debug!("Found .git link with key: {}", link.key.is_some());
964 (hex::encode(link.hash), link.key)
965 }
966 None => {
967 debug!("No .git directory in hashtree root");
968 return Ok(refs);
969 }
970 };
971
972 let git_data = match self.blossom.download(&git_hash).await {
974 Ok(data) => data,
975 Err(e) => {
976 anyhow::bail!(
977 "Failed to download .git directory ({}): {}",
978 &git_hash[..12],
979 e
980 );
981 }
982 };
983
984 let git_node = match self.decrypt_and_decode(&git_data, git_key.as_ref()) {
985 Some(node) => {
986 debug!(
987 "Decoded .git node with {} links: {:?}",
988 node.links.len(),
989 node.links
990 .iter()
991 .map(|l| l.name.as_deref())
992 .collect::<Vec<_>>()
993 );
994 node
995 }
996 None => {
997 debug!("Failed to decode .git node (key: {})", git_key.is_some());
998 return Ok(refs);
999 }
1000 };
1001
1002 let refs_link = git_node
1004 .links
1005 .iter()
1006 .find(|l| l.name.as_deref() == Some("refs"));
1007 let (refs_hash, refs_key) = match refs_link {
1008 Some(link) => (hex::encode(link.hash), link.key),
1009 None => {
1010 debug!("No refs directory in .git");
1011 return Ok(refs);
1012 }
1013 };
1014
1015 let refs_data = match self.blossom.try_download(&refs_hash).await {
1017 Some(data) => data,
1018 None => {
1019 debug!("Could not download refs directory");
1020 return Ok(refs);
1021 }
1022 };
1023
1024 let refs_node = match self.decrypt_and_decode(&refs_data, refs_key.as_ref()) {
1025 Some(node) => node,
1026 None => {
1027 return Ok(refs);
1028 }
1029 };
1030
1031 if let Some(head_link) = git_node
1033 .links
1034 .iter()
1035 .find(|l| l.name.as_deref() == Some("HEAD"))
1036 {
1037 let head_hash = hex::encode(head_link.hash);
1038 if let Some(head_data) = self.blossom.try_download(&head_hash).await {
1039 let head_content = if let Some(k) = head_link.key.as_ref() {
1041 match decrypt_chk(&head_data, k) {
1042 Ok(d) => String::from_utf8_lossy(&d).trim().to_string(),
1043 Err(_) => String::from_utf8_lossy(&head_data).trim().to_string(),
1044 }
1045 } else {
1046 String::from_utf8_lossy(&head_data).trim().to_string()
1047 };
1048 refs.insert("HEAD".to_string(), head_content);
1049 }
1050 }
1051
1052 for subdir_link in &refs_node.links {
1054 if subdir_link.link_type != LinkType::Dir {
1055 continue;
1056 }
1057 let subdir_name = match &subdir_link.name {
1058 Some(n) => n.clone(),
1059 None => continue,
1060 };
1061 let subdir_hash = hex::encode(subdir_link.hash);
1062
1063 self.collect_refs_recursive(
1064 &subdir_hash,
1065 subdir_link.key.as_ref(),
1066 &format!("refs/{}", subdir_name),
1067 &mut refs,
1068 )
1069 .await;
1070 }
1071
1072 debug!("Found {} refs from hashtree", refs.len());
1073 Ok(refs)
1074 }
1075
1076 async fn collect_refs_recursive(
1078 &self,
1079 dir_hash: &str,
1080 dir_key: Option<&[u8; 32]>,
1081 prefix: &str,
1082 refs: &mut HashMap<String, String>,
1083 ) {
1084 let dir_data = match self.blossom.try_download(dir_hash).await {
1085 Some(data) => data,
1086 None => return,
1087 };
1088
1089 let dir_node = match self.decrypt_and_decode(&dir_data, dir_key) {
1090 Some(node) => node,
1091 None => return,
1092 };
1093
1094 for link in &dir_node.links {
1095 let name = match &link.name {
1096 Some(n) => n.clone(),
1097 None => continue,
1098 };
1099 let link_hash = hex::encode(link.hash);
1100 let ref_path = format!("{}/{}", prefix, name);
1101
1102 if link.link_type == LinkType::Dir {
1103 Box::pin(self.collect_refs_recursive(
1105 &link_hash,
1106 link.key.as_ref(),
1107 &ref_path,
1108 refs,
1109 ))
1110 .await;
1111 } else {
1112 if let Some(ref_data) = self.blossom.try_download(&link_hash).await {
1114 let sha = if let Some(k) = link.key.as_ref() {
1116 match decrypt_chk(&ref_data, k) {
1117 Ok(d) => String::from_utf8_lossy(&d).trim().to_string(),
1118 Err(_) => String::from_utf8_lossy(&ref_data).trim().to_string(),
1119 }
1120 } else {
1121 String::from_utf8_lossy(&ref_data).trim().to_string()
1122 };
1123 if !sha.is_empty() {
1124 debug!("Found ref {} -> {}", ref_path, sha);
1125 refs.insert(ref_path, sha);
1126 }
1127 }
1128 }
1129 }
1130 }
1131
1132 #[allow(dead_code)]
1134 pub fn update_ref(&mut self, repo_name: &str, ref_name: &str, sha: &str) -> Result<()> {
1135 info!("Updating ref {} -> {} for {}", ref_name, sha, repo_name);
1136
1137 let refs = self.cached_refs.entry(repo_name.to_string()).or_default();
1138 refs.insert(ref_name.to_string(), sha.to_string());
1139
1140 Ok(())
1141 }
1142
1143 pub fn delete_ref(&mut self, repo_name: &str, ref_name: &str) -> Result<()> {
1145 info!("Deleting ref {} for {}", ref_name, repo_name);
1146
1147 if let Some(refs) = self.cached_refs.get_mut(repo_name) {
1148 refs.remove(ref_name);
1149 }
1150
1151 Ok(())
1152 }
1153
1154 pub fn get_cached_root_hash(&self, repo_name: &str) -> Option<&String> {
1156 self.cached_root_hash.get(repo_name)
1157 }
1158
1159 pub fn get_cached_encryption_key(&self, repo_name: &str) -> Option<&[u8; 32]> {
1161 self.cached_encryption_key.get(repo_name)
1162 }
1163
1164 pub fn blossom(&self) -> &BlossomClient {
1166 &self.blossom
1167 }
1168
1169 pub fn relay_urls(&self) -> Vec<String> {
1171 self.relays.clone()
1172 }
1173
1174 #[allow(dead_code)]
1176 pub fn pubkey(&self) -> &str {
1177 &self.pubkey
1178 }
1179
1180 pub fn npub(&self) -> String {
1182 PublicKey::from_hex(&self.pubkey)
1183 .ok()
1184 .and_then(|pk| pk.to_bech32().ok())
1185 .unwrap_or_else(|| self.pubkey.clone())
1186 }
1187
1188 pub fn publish_repo(
1196 &self,
1197 repo_name: &str,
1198 root_hash: &str,
1199 encryption_key: Option<(&[u8; 32], bool, bool)>,
1200 ) -> Result<(String, RelayResult)> {
1201 let keys = self.keys.as_ref().context(format!(
1202 "Cannot push: no secret key for {}. You can only push to your own repos.",
1203 &self.pubkey[..16]
1204 ))?;
1205
1206 info!(
1207 "Publishing repo {} with root hash {} (encrypted: {})",
1208 repo_name,
1209 root_hash,
1210 encryption_key.is_some()
1211 );
1212
1213 block_on_result(self.publish_repo_async(keys, repo_name, root_hash, encryption_key))
1215 }
1216
1217 async fn publish_repo_async(
1218 &self,
1219 keys: &Keys,
1220 repo_name: &str,
1221 root_hash: &str,
1222 encryption_key: Option<(&[u8; 32], bool, bool)>,
1223 ) -> Result<(String, RelayResult)> {
1224 let client = Client::new(keys.clone());
1226
1227 let configured: Vec<String> = self.relays.clone();
1228 let mut connected: Vec<String> = Vec::new();
1229 let mut failed: Vec<String> = Vec::new();
1230
1231 for relay in &self.relays {
1233 if let Err(e) = client.add_relay(relay).await {
1234 warn!("Failed to add relay {}: {}", relay, e);
1235 failed.push(relay.clone());
1236 }
1237 }
1238
1239 client.connect().await;
1241
1242 let _ = wait_for_any_connected_relay(&client, Duration::from_secs(3)).await;
1244
1245 let publish_created_at = next_replaceable_created_at(
1246 Timestamp::now(),
1247 latest_repo_event_created_at(
1248 &client,
1249 keys.public_key(),
1250 repo_name,
1251 Duration::from_secs(2),
1252 )
1253 .await,
1254 );
1255
1256 let mut tags = vec![
1258 Tag::custom(TagKind::custom("d"), vec![repo_name.to_string()]),
1259 Tag::custom(TagKind::custom("l"), vec![LABEL_HASHTREE.to_string()]),
1260 Tag::custom(TagKind::custom("hash"), vec![root_hash.to_string()]),
1261 ];
1262
1263 if let Some((key, is_link_visible, is_self_private)) = encryption_key {
1269 if is_self_private {
1270 let pubkey = keys.public_key();
1272 let key_hex = hex::encode(key);
1273 let encrypted =
1274 nip44::encrypt(keys.secret_key(), &pubkey, &key_hex, nip44::Version::V2)
1275 .map_err(|e| anyhow::anyhow!("NIP-44 encryption failed: {}", e))?;
1276 tags.push(Tag::custom(
1277 TagKind::custom("selfEncryptedKey"),
1278 vec![encrypted],
1279 ));
1280 } else if is_link_visible {
1281 tags.push(Tag::custom(
1283 TagKind::custom("encryptedKey"),
1284 vec![hex::encode(key)],
1285 ));
1286 } else {
1287 tags.push(Tag::custom(TagKind::custom("key"), vec![hex::encode(key)]));
1289 }
1290 }
1291
1292 append_repo_discovery_labels(&mut tags, repo_name);
1293
1294 let event = EventBuilder::new(Kind::Custom(KIND_APP_DATA), root_hash, tags)
1296 .custom_created_at(publish_created_at)
1297 .to_event(keys)
1298 .map_err(|e| anyhow::anyhow!("Failed to sign event: {}", e))?;
1299
1300 match client.send_event(event.clone()).await {
1302 Ok(output) => {
1303 for url in output.success.iter() {
1305 let url_str = url.to_string();
1306 if !connected.contains(&url_str) {
1307 connected.push(url_str);
1308 }
1309 }
1310 for (url, err) in output.failed.iter() {
1312 if err.is_some() {
1313 let url_str = url.to_string();
1314 if !failed.contains(&url_str) && !connected.contains(&url_str) {
1315 failed.push(url_str);
1316 }
1317 }
1318 }
1319 info!(
1320 "Sent event {} to {} relays ({} failed)",
1321 output.id(),
1322 output.success.len(),
1323 output.failed.len()
1324 );
1325 }
1326 Err(e) => {
1327 warn!("Failed to send event: {}", e);
1328 for relay in &self.relays {
1330 if !failed.contains(relay) {
1331 failed.push(relay.clone());
1332 }
1333 }
1334 }
1335 };
1336
1337 let npub_url = keys
1339 .public_key()
1340 .to_bech32()
1341 .map(|npub| format!("htree://{}/{}", npub, repo_name))
1342 .unwrap_or_else(|_| format!("htree://{}/{}", &self.pubkey[..16], repo_name));
1343
1344 let relay_validation = validate_repo_publish_relays(&configured, &connected);
1345
1346 let _ = client.disconnect().await;
1348 tokio::time::sleep(Duration::from_millis(50)).await;
1349
1350 relay_validation?;
1351
1352 Ok((
1353 npub_url,
1354 RelayResult {
1355 configured,
1356 connected,
1357 failed,
1358 },
1359 ))
1360 }
1361
1362 pub fn fetch_prs(
1364 &self,
1365 repo_name: &str,
1366 state_filter: PullRequestStateFilter,
1367 ) -> Result<Vec<PullRequestListItem>> {
1368 block_on_result(self.fetch_prs_async(repo_name, state_filter))
1369 }
1370
1371 pub async fn fetch_prs_async(
1372 &self,
1373 repo_name: &str,
1374 state_filter: PullRequestStateFilter,
1375 ) -> Result<Vec<PullRequestListItem>> {
1376 let client = Client::default();
1377
1378 for relay in &self.relays {
1379 if let Err(e) = client.add_relay(relay).await {
1380 warn!("Failed to add relay {}: {}", relay, e);
1381 }
1382 }
1383 client.connect().await;
1384
1385 if !wait_for_any_connected_relay(&client, Duration::from_secs(2)).await {
1387 let _ = client.disconnect().await;
1388 return Err(anyhow::anyhow!(
1389 "Failed to connect to any relay while fetching PRs"
1390 ));
1391 }
1392
1393 let repo_address = format!("{}:{}:{}", KIND_REPO_ANNOUNCEMENT, self.pubkey, repo_name);
1395 let pull_request_filter = Filter::new()
1396 .kind(Kind::Custom(KIND_PULL_REQUEST))
1397 .custom_tag(SingleLetterTag::lowercase(Alphabet::A), vec![&repo_address]);
1398
1399 let mut pr_events = match tokio::time::timeout(
1400 Duration::from_secs(3),
1401 client.get_events_of(vec![pull_request_filter.clone()], EventSource::relays(None)),
1402 )
1403 .await
1404 {
1405 Ok(Ok(events)) => events,
1406 Ok(Err(e)) => {
1407 let _ = client.disconnect().await;
1408 return Err(anyhow::anyhow!(
1409 "Failed to fetch PR events from relays: {}",
1410 e
1411 ));
1412 }
1413 Err(_) => {
1414 let _ = client.disconnect().await;
1415 return Err(anyhow::anyhow!("Timed out fetching PR events from relays"));
1416 }
1417 };
1418
1419 if pr_events.is_empty() {
1420 let fallback_events = fetch_events_via_raw_relay_query(
1421 &self.relays,
1422 pull_request_filter,
1423 Duration::from_secs(3),
1424 )
1425 .await;
1426 if !fallback_events.is_empty() {
1427 debug!(
1428 "Raw relay fallback recovered {} PR event(s) for {}",
1429 fallback_events.len(),
1430 repo_name
1431 );
1432 pr_events = fallback_events;
1433 }
1434 }
1435
1436 if pr_events.is_empty() {
1437 let _ = client.disconnect().await;
1438 return Ok(Vec::new());
1439 }
1440
1441 let pr_ids: Vec<String> = pr_events.iter().map(|e| e.id.to_hex()).collect();
1443
1444 let status_event_filter = Filter::new()
1446 .kinds(vec![
1447 Kind::Custom(KIND_STATUS_OPEN),
1448 Kind::Custom(KIND_STATUS_APPLIED),
1449 Kind::Custom(KIND_STATUS_CLOSED),
1450 Kind::Custom(KIND_STATUS_DRAFT),
1451 ])
1452 .custom_tag(
1453 SingleLetterTag::lowercase(Alphabet::E),
1454 pr_ids.iter().map(|s| s.as_str()).collect::<Vec<_>>(),
1455 );
1456
1457 let mut status_events = match tokio::time::timeout(
1458 Duration::from_secs(3),
1459 client.get_events_of(vec![status_event_filter.clone()], EventSource::relays(None)),
1460 )
1461 .await
1462 {
1463 Ok(Ok(events)) => events,
1464 Ok(Err(e)) => {
1465 let _ = client.disconnect().await;
1466 return Err(anyhow::anyhow!(
1467 "Failed to fetch PR status events from relays: {}",
1468 e
1469 ));
1470 }
1471 Err(_) => {
1472 let _ = client.disconnect().await;
1473 return Err(anyhow::anyhow!(
1474 "Timed out fetching PR status events from relays"
1475 ));
1476 }
1477 };
1478
1479 if status_events.is_empty() {
1480 let fallback_events = fetch_events_via_raw_relay_query(
1481 &self.relays,
1482 status_event_filter,
1483 Duration::from_secs(3),
1484 )
1485 .await;
1486 if !fallback_events.is_empty() {
1487 debug!(
1488 "Raw relay fallback recovered {} PR status event(s) for {}",
1489 fallback_events.len(),
1490 repo_name
1491 );
1492 status_events = fallback_events;
1493 }
1494 }
1495
1496 let _ = client.disconnect().await;
1497
1498 let latest_status =
1500 latest_trusted_pr_status_kinds(&pr_events, &status_events, &self.pubkey);
1501
1502 let mut prs = Vec::new();
1503 for event in &pr_events {
1504 let pr_id = event.id.to_hex();
1505 let state =
1506 PullRequestState::from_latest_status_kind(latest_status.get(&pr_id).copied());
1507 if !state_filter.includes(state) {
1508 continue;
1509 }
1510
1511 let mut subject = None;
1512 let mut commit_tip = None;
1513 let mut branch = None;
1514 let mut target_branch = None;
1515
1516 for tag in event.tags.iter() {
1517 let slice = tag.as_slice();
1518 if slice.len() >= 2 {
1519 match slice[0].as_str() {
1520 "subject" => subject = Some(slice[1].to_string()),
1521 "c" => commit_tip = Some(slice[1].to_string()),
1522 "branch" => branch = Some(slice[1].to_string()),
1523 "target-branch" => target_branch = Some(slice[1].to_string()),
1524 _ => {}
1525 }
1526 }
1527 }
1528
1529 prs.push(PullRequestListItem {
1530 event_id: pr_id,
1531 author_pubkey: event.pubkey.to_hex(),
1532 state,
1533 subject,
1534 commit_tip,
1535 branch,
1536 target_branch,
1537 created_at: event.created_at.as_u64(),
1538 });
1539 }
1540
1541 prs.sort_by(|left, right| {
1543 right
1544 .created_at
1545 .cmp(&left.created_at)
1546 .then_with(|| right.event_id.cmp(&left.event_id))
1547 });
1548
1549 debug!(
1550 "Found {} PRs for {} (filter: {:?})",
1551 prs.len(),
1552 repo_name,
1553 state_filter
1554 );
1555 Ok(prs)
1556 }
1557
1558 pub fn publish_pr_merged_status(
1560 &self,
1561 pr_event_id: &str,
1562 pr_author_pubkey: &str,
1563 ) -> Result<()> {
1564 let keys = self
1565 .keys
1566 .as_ref()
1567 .context("Cannot publish status: no secret key")?;
1568
1569 block_on_result(self.publish_pr_merged_status_async(keys, pr_event_id, pr_author_pubkey))
1570 }
1571
1572 async fn publish_pr_merged_status_async(
1573 &self,
1574 keys: &Keys,
1575 pr_event_id: &str,
1576 pr_author_pubkey: &str,
1577 ) -> Result<()> {
1578 let client = Client::new(keys.clone());
1579
1580 for relay in &self.relays {
1581 if let Err(e) = client.add_relay(relay).await {
1582 warn!("Failed to add relay {}: {}", relay, e);
1583 }
1584 }
1585 client.connect().await;
1586
1587 if !wait_for_any_connected_relay(&client, Duration::from_secs(3)).await {
1589 anyhow::bail!("Failed to connect to any relay for status publish");
1590 }
1591
1592 let tags = vec![
1593 Tag::custom(TagKind::custom("e"), vec![pr_event_id.to_string()]),
1594 Tag::custom(TagKind::custom("p"), vec![pr_author_pubkey.to_string()]),
1595 ];
1596
1597 let event = EventBuilder::new(Kind::Custom(KIND_STATUS_APPLIED), "", tags)
1598 .to_event(keys)
1599 .map_err(|e| anyhow::anyhow!("Failed to sign status event: {}", e))?;
1600
1601 let publish_result = match client.send_event(event).await {
1602 Ok(output) => {
1603 if output.success.is_empty() {
1604 Err(anyhow::anyhow!(
1605 "PR merged status was not confirmed by any relay"
1606 ))
1607 } else {
1608 info!(
1609 "Published PR merged status to {} relays",
1610 output.success.len()
1611 );
1612 Ok(())
1613 }
1614 }
1615 Err(e) => Err(anyhow::anyhow!("Failed to publish PR merged status: {}", e)),
1616 };
1617
1618 let _ = client.disconnect().await;
1619 tokio::time::sleep(Duration::from_millis(50)).await;
1620 publish_result
1621 }
1622
1623 #[allow(dead_code)]
1625 pub async fn upload_blob(&self, _hash: &str, data: &[u8]) -> Result<String> {
1626 let hash = self
1627 .blossom
1628 .upload(data)
1629 .await
1630 .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))?;
1631 Ok(hash)
1632 }
1633
1634 #[allow(dead_code)]
1636 pub async fn upload_blob_if_missing(&self, data: &[u8]) -> Result<(String, bool)> {
1637 self.blossom
1638 .upload_if_missing(data)
1639 .await
1640 .map_err(|e| anyhow::anyhow!("Blossom upload failed: {}", e))
1641 }
1642
1643 #[allow(dead_code)]
1645 pub async fn download_blob(&self, hash: &str) -> Result<Vec<u8>> {
1646 self.blossom
1647 .download(hash)
1648 .await
1649 .map_err(|e| anyhow::anyhow!("Blossom download failed: {}", e))
1650 }
1651
1652 #[allow(dead_code)]
1654 pub async fn try_download_blob(&self, hash: &str) -> Option<Vec<u8>> {
1655 self.blossom.try_download(hash).await
1656 }
1657}
1658
1659#[cfg(test)]
1660mod tests;