1use anyhow::{Context, Result};
3use nostr_sdk::nips::nip04;
4use nostr_sdk::nips::nip59::UnwrappedGift;
5use nostr_sdk::{
6 Client, EventBuilder, Filter, Keys, Kind, RelayPoolNotification, Tag, Timestamp, ToBech32,
7};
8use serde::{Deserialize, Serialize};
9use std::future::Future;
10use std::pin::Pin;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13use tracing::{debug, error, info, warn};
14
15pub const KIND_PROVIDER_OFFER: u16 = 38383;
33pub const KIND_PROVIDER_HEARTBEAT: u16 = 38384;
34pub const KIND_PROVIDER_HEARTBEAT_EPHEMERAL: u16 = 20384;
35pub const KIND_LEASE_REVOCATION: u16 = 38385;
43
44pub const KIND_STANDBY_PROMOTION_ANNOUNCEMENT: u16 = 38386;
54
55pub const SCHEMA_VERSION: u8 = 1;
58
59pub const LIVE_HEARTBEAT_WINDOW_SECS: u64 = 300;
65
66pub const HEARTBEAT_BUCKET_SECS: u64 = 60;
71#[derive(Clone, Debug)]
72pub struct RelayConfig {
73 pub relays: Vec<String>,
74 pub private_key: Option<String>,
75}
76
77#[derive(Debug, Clone, Deserialize, Serialize)]
78pub struct NostrEvent {
79 pub id: String,
80 pub pubkey: String,
81 pub created_at: u64,
82 pub kind: u32,
83 pub tags: Vec<Vec<String>>,
84 pub content: String,
85 pub sig: String,
86 pub message_type: String, }
88
89#[derive(Clone)]
90pub struct NostrRelaySubscriber {
91 client: Client,
92 keys: Keys,
93 }
95
96impl NostrRelaySubscriber {
97 pub async fn new(config: RelayConfig) -> Result<Self> {
98 let keys = match &config.private_key {
99 Some(private_key_hex) if !private_key_hex.is_empty() => {
100 if private_key_hex.starts_with("nsec1") {
102 Keys::parse(private_key_hex).context("Invalid nsec private key format")?
103 } else {
104 Keys::parse(private_key_hex).context("Invalid private key format")?
106 }
107 }
108 _ => {
109 Keys::generate()
111 }
112 };
113
114 let client = Client::new(keys.clone());
115
116 for relay_url in &config.relays {
118 info!("Adding relay: {}", relay_url);
119 client
120 .add_relay(relay_url)
121 .await
122 .with_context(|| format!("Invalid relay URL: {}", relay_url))?;
123 }
124
125 info!("Connecting to {} relays...", config.relays.len());
126 client.connect().await;
127
128 tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
130
131 info!("Connected to {} relays", config.relays.len());
132 info!(
133 "Service public key (npub): {}",
134 keys.public_key().to_bech32().unwrap()
135 );
136
137 Ok(Self { client, keys })
138 }
139
140 pub fn public_key(&self) -> nostr_sdk::PublicKey {
141 self.keys.public_key()
142 }
143
144 pub async fn subscribe_to_pod_events<F>(&self, handler: F) -> Result<()>
145 where
146 F: Fn(NostrEvent) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
147 + Send
148 + Sync
149 + 'static,
150 {
151 let nip04_filter = Filter::new()
153 .kind(Kind::EncryptedDirectMessage)
154 .pubkeys(vec![self.keys.public_key()]) .limit(0);
156
157 let nip17_filter = Filter::new()
158 .kind(Kind::GiftWrap)
159 .pubkeys(vec![self.keys.public_key()]) .limit(0);
161
162 let revocation_filter = Filter::new()
168 .kind(Kind::Custom(KIND_LEASE_REVOCATION))
169 .pubkeys(vec![self.keys.public_key()])
170 .limit(0);
171
172 let _ = self.client.subscribe(nip04_filter, None).await;
173 let _ = self.client.subscribe(nip17_filter, None).await;
174 let _ = self.client.subscribe(revocation_filter, None).await;
175 info!("Subscribed to NIP-04 / NIP-17 messages and KIND_LEASE_REVOCATION events addressed to this provider");
176
177 self.client.handle_notifications(|notification| async {
179 if let RelayPoolNotification::Event { relay_url: _, subscription_id: _, event } = notification {
180 match event.kind {
181 Kind::GiftWrap => {
182 info!("Received NIP-17 Gift Wrap message: {}", event.id);
183
184 match self.client.unwrap_gift_wrap(&event).await {
186 Ok(UnwrappedGift { rumor, sender }) => {
187 info!("Unwrapped Gift Wrap from sender: {}, rumor kind: {}", sender, rumor.kind);
188
189 if rumor.kind == Kind::PrivateDirectMessage {
191 debug!("NIP-17 rumor is PrivateDirectMessage. Content length: {}", rumor.content.len());
192
193 let nostr_event = NostrEvent {
195 id: rumor.id.map(|id| id.to_hex()).unwrap_or_else(|| "unknown".to_string()),
196 pubkey: rumor.pubkey.to_hex(),
197 created_at: rumor.created_at.as_u64(),
198 kind: rumor.kind.as_u16() as u32,
199 tags: rumor.tags.iter().map(|tag| {
200 tag.as_slice().iter().map(|s| s.to_string()).collect()
201 }).collect(),
202 content: rumor.content,
203 sig: "unsigned".to_string(), message_type: "nip17".to_string(), };
206
207 match handler(nostr_event).await {
208 Ok(()) => {
209 info!("Successfully processed NIP-17 private message: {}", event.id);
210 }
211 Err(e) => {
212 error!("Failed to process NIP-17 private message {}: {}", event.id, e);
213 }
214 }
215 } else {
216 info!("Rumor is not a private direct message, kind: {}", rumor.kind);
217 }
218 }
219 Err(e) => {
220 error!("Failed to unwrap Gift Wrap {}: {}", event.id, e);
221 }
222 }
223 }
224 Kind::EncryptedDirectMessage => {
225 info!("Received NIP-04 Encrypted Direct Message: {}", event.id);
226
227 let secret_key = self.keys.secret_key();
228 match nip04::decrypt(secret_key, &event.pubkey, &event.content) {
229 Ok(decrypted_content) => {
230 debug!(
231 "Decrypted NIP-04 message. Length: {}",
232 decrypted_content.len()
233 );
234
235 let nostr_event = NostrEvent {
236 id: event.id.to_hex(),
237 pubkey: event.pubkey.to_hex(),
238 created_at: event.created_at.as_u64(),
239 kind: event.kind.as_u16() as u32,
240 tags: event
241 .tags
242 .iter()
243 .map(|tag| {
244 tag.as_slice()
245 .iter()
246 .map(|s| s.to_string())
247 .collect()
248 })
249 .collect(),
250 content: decrypted_content,
251 sig: event.sig.to_string(),
252 message_type: "nip04".to_string(),
253 };
254
255 match handler(nostr_event).await {
256 Ok(()) => info!(
257 "Successfully processed NIP-04 private message: {}",
258 event.id
259 ),
260 Err(e) => error!(
261 "Failed to process NIP-04 private message {}: {}",
262 event.id, e
263 ),
264 }
265 }
266 Err(e) => {
267 error!(
268 "Failed to decrypt NIP-04 message {}: {}",
269 event.id, e
270 );
271 }
272 }
273 }
274 Kind::Custom(k) if k == KIND_LEASE_REVOCATION => {
275 info!("Received lease revocation event: {}", event.id);
280 let nostr_event = NostrEvent {
281 id: event.id.to_hex(),
282 pubkey: event.pubkey.to_hex(),
283 created_at: event.created_at.as_u64(),
284 kind: event.kind.as_u16() as u32,
285 tags: event
286 .tags
287 .iter()
288 .map(|tag| {
289 tag.as_slice().iter().map(|s| s.to_string()).collect()
290 })
291 .collect(),
292 content: event.content.clone(),
293 sig: event.sig.to_string(),
294 message_type: "lease_revocation".to_string(),
295 };
296 if let Err(e) = handler(nostr_event).await {
297 error!("Failed to process lease revocation {}: {}", event.id, e);
298 }
299 }
300 _ => {
301 info!("Received unsupported event kind: {}", event.kind);
302 }
303 }
304 }
305 Ok(false) }).await?;
307
308 Ok(())
309 }
310
311 pub async fn publish_offer(&self, offer: OfferEventContent) -> Result<String> {
312 let content = serde_json::to_string(&offer)?;
313 info!("Publishing offer event with content: {}", content);
314
315 let tags = vec![Tag::hashtag("paygress"), Tag::hashtag("offer")];
316
317 info!("Creating event with kind 999 and {} tags", tags.len());
318 let event = EventBuilder::new(Kind::Custom(999), content)
319 .tags(tags)
320 .sign_with_keys(&self.keys)?;
321 let event_id = event.id.to_hex();
322
323 info!("Event created with ID: {}", event_id);
324 info!("Sending offer event to relays: {}", event_id);
325
326 match self.client.send_event(&event).await {
327 Ok(res) => {
328 info!(
329 "✅ Successfully published offer event: {} and {:?}",
330 event_id, res
331 );
332 Ok(event_id)
333 }
334 Err(e) => {
335 error!("❌ Failed to send offer event: {}", e);
336 Err(e.into())
337 }
338 }
339 }
340
341 pub async fn send_encrypted_private_message(
343 &self,
344 receiver_pubkey: &str,
345 content: String,
346 message_type: &str,
347 ) -> Result<String> {
348 let receiver_pubkey_parsed = nostr_sdk::PublicKey::parse(receiver_pubkey)?;
349
350 match message_type {
351 "nip04" => {
352 let secret_key = self.keys.secret_key();
353 let encrypted_content =
354 nip04::encrypt(secret_key, &receiver_pubkey_parsed, &content)?;
355 let receiver_tag = Tag::public_key(receiver_pubkey_parsed);
356 let alt_tag = Tag::parse(["alt", "Private Message"])?;
357
358 let event = EventBuilder::new(Kind::EncryptedDirectMessage, encrypted_content)
359 .tags([receiver_tag, alt_tag])
360 .sign_with_keys(&self.keys)?;
361 let event_id = self.client.send_event(&event).await?;
362 info!("Sent NIP-04 message to {}: {:?}", receiver_pubkey, event_id);
363 Ok(event_id.val.to_hex())
364 }
365 "nip17" | _ => {
366 let event_id = self
368 .client
369 .send_private_msg(receiver_pubkey_parsed, content, [])
370 .await?;
371 info!("Sent NIP-17 message to {}: {:?}", receiver_pubkey, event_id);
372 Ok(event_id.val.to_hex())
373 }
374 }
375 }
376
377 pub async fn send_access_details_private_message(
379 &self,
380 request_pubkey: &str,
381 details: AccessDetailsContent,
382 message_type: &str,
383 ) -> Result<String> {
384 let details_json = serde_json::to_string(&details)?;
385 self.send_encrypted_private_message(request_pubkey, details_json, message_type)
386 .await
387 }
388
389 pub async fn send_status_response(
391 &self,
392 request_pubkey: &str,
393 response: StatusResponseContent,
394 message_type: &str,
395 ) -> Result<String> {
396 let response_json = serde_json::to_string(&response)?;
397 self.send_encrypted_private_message(request_pubkey, response_json, message_type)
398 .await
399 }
400
401 pub async fn send_error_response(
403 &self,
404 request_pubkey: &str,
405 error_type: &str,
406 message: &str,
407 details: Option<&str>,
408 message_type: &str,
409 ) -> Result<String> {
410 let error = ErrorResponseContent {
411 error_type: error_type.to_string(),
412 message: message.to_string(),
413 details: details.map(|s| s.to_string()),
414 };
415 self.send_error_response_private_message(request_pubkey, error, message_type)
416 .await
417 }
418
419 pub async fn send_error_response_private_message(
421 &self,
422 request_pubkey: &str,
423 error: ErrorResponseContent,
424 message_type: &str,
425 ) -> Result<String> {
426 let error_json = serde_json::to_string(&error)?;
427 self.send_encrypted_private_message(request_pubkey, error_json, message_type)
428 .await
429 }
430
431 pub async fn send_topup_response_private_message(
433 &self,
434 request_pubkey: &str,
435 response: TopUpResponseContent,
436 message_type: &str,
437 ) -> Result<String> {
438 let response_json = serde_json::to_string(&response)?;
439 self.send_encrypted_private_message(request_pubkey, response_json, message_type)
440 .await
441 }
442
443 pub fn client(&self) -> &Client {
445 &self.client
446 }
447
448 pub fn get_service_public_key(&self) -> String {
450 self.keys.public_key().to_hex()
451 }
452
453 #[allow(dead_code)]
454 fn convert_event(&self, event: &nostr_sdk::Event) -> NostrEvent {
455 NostrEvent {
456 id: event.id.to_hex(),
457 pubkey: event.pubkey.to_hex(),
458 created_at: event.created_at.as_u64(),
459 kind: event.kind.as_u16() as u32,
460 tags: event
461 .tags
462 .iter()
463 .map(|tag| tag.as_slice().iter().map(|s| s.to_string()).collect())
464 .collect(),
465 content: event.content.clone(),
466 sig: event.sig.to_string(),
467 message_type: "unknown".to_string(),
468 }
469 }
470
471 pub async fn wait_for_decrypted_message(
473 &self,
474 sender_pubkey: &str,
475 timeout_secs: u64,
476 ) -> Result<NostrEvent> {
477 let sender_pk = nostr_sdk::PublicKey::parse(sender_pubkey)?;
478 let receiver_pk = self.keys.public_key();
479
480 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
481 let tx = Arc::new(Mutex::new(Some(tx)));
482 let client = self.client.clone();
483 let receiver_keys = self.keys.clone();
484 let timeout = tokio::time::Duration::from_secs(timeout_secs);
485
486 let subscribe_since = std::time::SystemTime::now()
498 .duration_since(std::time::UNIX_EPOCH)
499 .map(|d| d.as_secs())
500 .unwrap_or(0)
501 .saturating_sub(60);
502 let filter = Filter::new()
503 .pubkeys(vec![receiver_pk])
504 .kinds(vec![Kind::EncryptedDirectMessage, Kind::GiftWrap])
505 .since(nostr_sdk::Timestamp::from_secs(subscribe_since));
506
507 let _ = client.subscribe(filter, None).await;
508
509 let result = tokio::select! {
511 notification_res = client.handle_notifications(|notification| {
512 let tx = tx.clone();
513 let receiver_keys = receiver_keys.clone();
514 let sender_pk = sender_pk.clone();
515 let client = client.clone();
516
517 async move {
518 if let RelayPoolNotification::Event { event, .. } = notification {
519 let mut event_to_send = None;
520
521 match event.kind {
522 Kind::GiftWrap => {
523 if let Ok(UnwrappedGift { rumor, sender }) = client.unwrap_gift_wrap(&event).await {
525 if sender == sender_pk && rumor.kind == Kind::PrivateDirectMessage {
526 event_to_send = Some(NostrEvent {
527 id: rumor.id.map(|id| id.to_hex()).unwrap_or_default(),
528 pubkey: sender.to_hex(),
529 created_at: rumor.created_at.as_u64(),
530 kind: rumor.kind.as_u16() as u32,
531 tags: rumor.tags.iter().map(|tag| tag.as_slice().iter().map(|s| s.to_string()).collect()).collect(),
532 content: rumor.content,
533 sig: String::new(),
534 message_type: "nip17".to_string(),
535 });
536 }
537 }
538 }
539 Kind::EncryptedDirectMessage => {
540 if event.pubkey == sender_pk {
541 let secret_key = receiver_keys.secret_key();
542 if let Ok(content) = nip04::decrypt(secret_key, &event.pubkey, &event.content) {
543 event_to_send = Some(NostrEvent {
544 id: event.id.to_hex(),
545 pubkey: event.pubkey.to_hex(),
546 created_at: event.created_at.as_u64(),
547 kind: event.kind.as_u16() as u32,
548 tags: event.tags.iter().map(|tag| tag.as_slice().iter().map(|s| s.to_string()).collect()).collect(),
549 content,
550 sig: event.sig.to_string(),
551 message_type: "nip04".to_string(),
552 });
553 }
554 }
555 }
556 _ => {}
557 }
558
559 if let Some(ev) = event_to_send {
560 let mut lock = tx.lock().await;
561 if let Some(sender) = lock.take() {
562 let _ = sender.send(ev).await;
563 return Ok(true); }
565 }
566 }
567 Ok(false)
568 }
569 }) => {
570 match notification_res {
571 Ok(_) => rx.recv().await.ok_or_else(|| anyhow::anyhow!("Channel closed")),
572 Err(e) => Err(anyhow::anyhow!("Notification handler error: {}", e)),
573 }
574 }
575 _ = tokio::time::sleep(timeout) => {
576 Err(anyhow::anyhow!("Timeout waiting for response from {}", sender_pubkey))
577 }
578 };
579
580 result
581 }
582}
583
584pub fn default_relay_config() -> RelayConfig {
585 RelayConfig {
586 relays: vec![
587 "wss://relay.damus.io".to_string(),
588 "wss://nos.lol".to_string(),
589 "wss://relay.nostr.band".to_string(),
590 ],
591 private_key: None,
592 }
593}
594
595pub fn custom_relay_config(relays: Vec<String>, private_key: Option<String>) -> RelayConfig {
596 RelayConfig {
597 relays,
598 private_key,
599 }
600}
601
602#[derive(Debug, Clone, Serialize, Deserialize)]
603pub struct PodSpec {
604 pub id: String, pub name: String, pub description: String, pub cpu_millicores: u64, pub memory_mb: u64, pub rate_msats_per_sec: u64, }
611
612#[derive(Debug, Clone, Serialize, Deserialize)]
613pub struct OfferEventContent {
614 pub minimum_duration_seconds: u64,
615 pub whitelisted_mints: Vec<String>,
616 pub pod_specs: Vec<PodSpec>, }
618
619#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
623pub struct TemplateAccessPort {
624 pub host_port: u16,
626 pub container_port: u16,
629 pub protocol: String,
631 pub label: String,
635}
636
637#[derive(Debug, Clone, Serialize, Deserialize)]
638pub struct AccessDetailsContent {
639 pub pod_npub: String, pub node_port: u16, pub expires_at: String, pub cpu_millicores: u64, pub memory_mb: u64, pub pod_spec_name: String, pub pod_spec_description: String, pub instructions: Vec<String>, #[serde(default, skip_serializing_if = "String::is_empty")]
653 pub host_address: String,
654
655 #[serde(default, skip_serializing_if = "Vec::is_empty")]
659 pub template_ports: Vec<TemplateAccessPort>,
660}
661
662#[derive(Debug, Clone, Serialize, Deserialize)]
663pub struct ErrorResponseContent {
664 pub error_type: String, pub message: String, pub details: Option<String>, }
668
669#[derive(Debug, Clone, Serialize, Deserialize)]
670pub struct TopUpResponseContent {
671 pub success: bool,
672 pub pod_npub: String,
673 pub extended_duration_seconds: u64,
674 pub new_expires_at: String,
675 pub message: String,
676}
677
678#[derive(Debug, Clone, Serialize, Deserialize)]
680pub struct EncryptedSpawnPodRequest {
681 pub cashu_token: String,
682 pub pod_spec_id: Option<String>, pub pod_image: String, pub ssh_username: String,
685 pub ssh_password: String,
686
687 #[serde(default, skip_serializing_if = "Option::is_none")]
696 pub template_slug: Option<String>,
697
698 #[serde(default, skip_serializing_if = "Option::is_none")]
710 pub replication: Option<crate::durable_workload::ReplicationMode>,
711
712 #[serde(default, skip_serializing_if = "Option::is_none")]
719 pub primary_npub: Option<String>,
720
721 #[serde(default, skip_serializing_if = "Option::is_none")]
729 pub workload_id: Option<String>,
730
731 #[serde(default, skip_serializing_if = "Option::is_none")]
755 pub volume_encryption: Option<VolumeEncryption>,
756}
757
758#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
772pub struct VolumeEncryption {
773 #[serde(default = "volume_encryption_default_version")]
777 pub version: u8,
778
779 pub algorithm: String,
781
782 pub key_b64: String,
787}
788
789fn volume_encryption_default_version() -> u8 {
790 1
791}
792
793impl VolumeEncryption {
794 pub const ALGORITHM_V1: &'static str = "luks2-aes-xts";
797 pub const VERSION_V1: u8 = 1;
798
799 pub fn v1(key: [u8; 32]) -> Self {
801 use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
802 Self {
803 version: Self::VERSION_V1,
804 algorithm: Self::ALGORITHM_V1.to_string(),
805 key_b64: URL_SAFE_NO_PAD.encode(key),
806 }
807 }
808
809 pub fn decoded_key(&self) -> Result<[u8; 32], anyhow::Error> {
813 use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
814 let bytes = URL_SAFE_NO_PAD
815 .decode(self.key_b64.as_bytes())
816 .map_err(|e| anyhow::anyhow!("volume_encryption.key_b64 invalid base64: {}", e))?;
817 if bytes.len() != 32 {
818 anyhow::bail!(
819 "volume_encryption.key_b64 decoded to {} bytes, expected 32",
820 bytes.len()
821 );
822 }
823 let mut out = [0u8; 32];
824 out.copy_from_slice(&bytes);
825 Ok(out)
826 }
827}
828
829#[derive(Debug, Clone, PartialEq, Eq)]
839pub enum WarmStandbyRole {
840 Primary,
841 Standby { index: usize, count: usize },
842 NotAddressed,
843}
844
845pub fn warm_standby_role(
846 self_npub: &str,
847 primary_npub: &str,
848 standby_providers: &[String],
849) -> WarmStandbyRole {
850 if npubs_equal(self_npub, primary_npub) {
864 return WarmStandbyRole::Primary;
865 }
866 for (idx, p) in standby_providers.iter().enumerate() {
867 if npubs_equal(self_npub, p) {
868 return WarmStandbyRole::Standby {
869 index: idx,
870 count: standby_providers.len(),
871 };
872 }
873 }
874 WarmStandbyRole::NotAddressed
875}
876
877pub fn npubs_equal(a: &str, b: &str) -> bool {
886 let pa = nostr_sdk::PublicKey::parse(a);
887 let pb = nostr_sdk::PublicKey::parse(b);
888 match (pa, pb) {
889 (Ok(ka), Ok(kb)) => ka == kb,
890 (Ok(_), Err(_)) | (Err(_), Ok(_)) => false,
897 (Err(_), Err(_)) => a == b,
901 }
902}
903
904#[derive(Debug, Clone, Serialize, Deserialize)]
906pub struct EncryptedTopUpPodRequest {
907 pub pod_npub: String, pub cashu_token: String,
909}
910
911pub async fn send_provisioning_request_private_message(
913 client: &Client,
914 service_pubkey: &str,
915 request: EncryptedSpawnPodRequest,
916) -> Result<String> {
917 let request_json = serde_json::to_string(&request)?;
918
919 let service_pubkey_parsed = nostr_sdk::PublicKey::parse(service_pubkey)?;
921 let event_id = client
922 .send_private_msg(service_pubkey_parsed, request_json, [])
923 .await?;
924
925 Ok(event_id.val.to_hex())
926}
927
928#[derive(Debug, Clone, Serialize, Deserialize)]
931#[serde(untagged)]
932pub enum PrivateRequest {
933 Spawn(EncryptedSpawnPodRequest),
934 TopUp(EncryptedTopUpPodRequest),
935 Status(StatusRequestContent),
936}
937
938pub fn parse_private_message_content(content: &str) -> Result<PrivateRequest> {
939 match serde_json::from_str::<PrivateRequest>(content) {
940 Ok(request) => Ok(request),
941 Err(e) => {
942 let truncated_content = if content.len() > 100 {
944 format!("{}...", &content[..100])
945 } else {
946 content.to_string()
947 };
948 Err(anyhow::anyhow!(
949 "JSON parsing failed: {}. Content: '{}'",
950 e,
951 truncated_content
952 ))
953 }
954 }
955}
956
957pub fn parse_revocation_event(event: &NostrEvent) -> Option<LeaseRevocationContent> {
965 if event.kind != KIND_LEASE_REVOCATION as u32 {
966 return None;
967 }
968 serde_json::from_str::<LeaseRevocationContent>(&event.content).ok()
969}
970
971#[derive(Debug, Clone, Serialize, Deserialize)]
975pub struct CapacityInfo {
976 pub cpu_available: u64, pub memory_mb_available: u64, pub storage_gb_available: u64, }
980
981#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
985#[serde(rename_all = "kebab-case")]
986pub enum IsolationLevel {
987 #[default]
989 SharedKernel,
990 DedicatedHost,
992 AttestedResearchTier,
994}
995
996impl IsolationLevel {
997 pub fn rank(self) -> u8 {
1006 match self {
1007 Self::SharedKernel => 0,
1008 Self::DedicatedHost => 1,
1009 Self::AttestedResearchTier => 2,
1010 }
1011 }
1012
1013 pub fn meets(self, min: IsolationLevel) -> bool {
1016 self.rank() >= min.rank()
1017 }
1018
1019 pub fn from_slug(s: &str) -> Option<Self> {
1021 match s {
1022 "shared-kernel" => Some(Self::SharedKernel),
1023 "dedicated-host" => Some(Self::DedicatedHost),
1024 "attested-research-tier" => Some(Self::AttestedResearchTier),
1025 _ => None,
1026 }
1027 }
1028
1029 pub fn slug(self) -> &'static str {
1032 match self {
1033 Self::SharedKernel => "shared-kernel",
1034 Self::DedicatedHost => "dedicated-host",
1035 Self::AttestedResearchTier => "attested-research-tier",
1036 }
1037 }
1038}
1039
1040fn default_schema_version() -> u8 {
1041 SCHEMA_VERSION
1042}
1043
1044#[derive(Debug, Clone, Serialize, Deserialize)]
1049pub struct ProviderOfferContent {
1050 pub provider_npub: String,
1051 pub hostname: String,
1052 pub location: Option<String>,
1053 pub capabilities: Vec<String>, pub specs: Vec<PodSpec>,
1055 pub whitelisted_mints: Vec<String>,
1056 pub uptime_percent: f32,
1057 pub total_jobs_completed: u64,
1058 pub api_endpoint: Option<String>,
1059
1060 #[serde(default = "default_schema_version")]
1063 pub version: u8,
1064
1065 #[serde(default)]
1067 pub isolation_level: IsolationLevel,
1068
1069 #[serde(default, skip_serializing_if = "Option::is_none")]
1072 pub stake_proof: Option<crate::stake::StakeProof>,
1073}
1074
1075#[derive(Debug, Clone, Serialize, Deserialize)]
1082pub struct HeartbeatContent {
1083 pub provider_npub: String,
1084 pub timestamp: u64,
1085 pub active_workloads: u32,
1086 pub available_capacity: CapacityInfo,
1087
1088 #[serde(default = "default_schema_version")]
1090 pub version: u8,
1091}
1092
1093#[derive(Debug, Clone, Serialize, Deserialize)]
1103pub struct LeaseRevocationContent {
1104 pub workload_id: String,
1113 pub primary_provider_npub: String,
1114 pub standby_providers: Vec<String>,
1115 pub reason: String,
1116 pub revoked_at: u64,
1117
1118 #[serde(default, skip_serializing_if = "Option::is_none")]
1122 pub state_uri: Option<String>,
1123
1124 #[serde(default = "default_schema_version")]
1126 pub version: u8,
1127}
1128
1129#[derive(Debug, Clone, Serialize, Deserialize)]
1137pub struct StandbyPromotionAnnouncementContent {
1138 pub workload_id: String,
1142 pub new_primary_npub: String,
1146 pub promoted_at: u64,
1148 #[serde(default = "default_schema_version")]
1149 pub version: u8,
1150}
1151
1152#[derive(Debug, Clone, Serialize, Deserialize)]
1154pub struct ProviderInfo {
1155 pub npub: String,
1156 pub hostname: String,
1157 pub location: Option<String>,
1158 pub capabilities: Vec<String>,
1159 pub specs: Vec<PodSpec>,
1160 pub whitelisted_mints: Vec<String>,
1161 pub uptime_percent: f32,
1162 pub total_jobs_completed: u64,
1163 pub last_seen: u64, pub is_online: bool,
1165 pub isolation_level: IsolationLevel,
1170}
1171
1172#[derive(Debug, Clone, Default)]
1174pub struct ProviderFilter {
1175 pub capability: Option<String>,
1176 pub min_uptime: Option<f32>,
1177 pub min_memory_mb: Option<u64>,
1178 pub min_cpu: Option<u64>,
1179 pub isolation_level: Option<IsolationLevel>,
1183}
1184
1185impl NostrRelaySubscriber {
1186 pub async fn publish_provider_offer(&self, offer: ProviderOfferContent) -> Result<String> {
1191 let content = serde_json::to_string(&offer)?;
1192 info!("Publishing provider offer for {}", offer.hostname);
1193
1194 let d_tag = format!("paygress:offer:v{}:{}", offer.version, offer.provider_npub);
1195 let tags = vec![
1196 Tag::hashtag("paygress"),
1197 Tag::hashtag("compute"),
1198 Tag::parse(["d", d_tag.as_str()])?,
1199 Tag::parse(["v", offer.version.to_string().as_str()])?,
1200 ];
1201
1202 let event = EventBuilder::new(Kind::Custom(KIND_PROVIDER_OFFER), content)
1203 .tags(tags)
1204 .sign_with_keys(&self.keys)?;
1205 let event_id = event.id.to_hex();
1206
1207 match self.client.send_event(&event).await {
1208 Ok(res) => {
1209 info!("✅ Published provider offer: {} ({:?})", event_id, res);
1210 Ok(event_id)
1211 }
1212 Err(e) => {
1213 error!("❌ Failed to publish provider offer: {}", e);
1214 Err(e.into())
1215 }
1216 }
1217 }
1218
1219 pub async fn publish_heartbeat(
1232 &self,
1233 heartbeat: HeartbeatContent,
1234 ) -> Result<(String, Vec<String>)> {
1235 let content = serde_json::to_string(&heartbeat)?;
1236 let bucket = heartbeat.timestamp / HEARTBEAT_BUCKET_SECS;
1237 let d_tag = format!(
1238 "paygress:heartbeat:v{}:{}:{}",
1239 heartbeat.version, heartbeat.provider_npub, bucket
1240 );
1241
1242 let provider_pk = nostr_sdk::PublicKey::parse(&heartbeat.provider_npub)?;
1243 let v_tag = heartbeat.version.to_string();
1244
1245 let stored_tags = vec![
1248 Tag::hashtag("paygress-heartbeat"),
1249 Tag::public_key(provider_pk),
1250 Tag::parse(["d", d_tag.as_str()])?,
1251 Tag::parse(["v", v_tag.as_str()])?,
1252 ];
1253 let stored_event =
1254 EventBuilder::new(Kind::Custom(KIND_PROVIDER_HEARTBEAT), content.clone())
1255 .tags(stored_tags)
1256 .sign_with_keys(&self.keys)?;
1257 let stored_id = stored_event.id.to_hex();
1258
1259 let ephemeral_tags = vec![
1262 Tag::hashtag("paygress-heartbeat"),
1263 Tag::public_key(provider_pk),
1264 Tag::parse(["v", v_tag.as_str()])?,
1265 ];
1266 let ephemeral_event =
1267 EventBuilder::new(Kind::Custom(KIND_PROVIDER_HEARTBEAT_EPHEMERAL), content)
1268 .tags(ephemeral_tags)
1269 .sign_with_keys(&self.keys)?;
1270
1271 let mut accepting_relays: Vec<String> = Vec::new();
1272 match self.client.send_event(&stored_event).await {
1273 Ok(out) => {
1274 debug!("📦 Stored heartbeat published: {}", stored_id);
1275 accepting_relays = out.success.iter().map(|u| u.to_string()).collect();
1276 }
1277 Err(e) => warn!("Failed to publish stored heartbeat: {}", e),
1278 }
1279 match self.client.send_event(&ephemeral_event).await {
1280 Ok(_) => debug!("⚡ Ephemeral heartbeat published"),
1281 Err(e) => warn!("Failed to publish ephemeral heartbeat: {}", e),
1282 }
1283
1284 info!(
1285 "💓 Heartbeat published (stored + ephemeral): {} accepted by {} relay(s)",
1286 stored_id,
1287 accepting_relays.len()
1288 );
1289 Ok((stored_id, accepting_relays))
1290 }
1291
1292 pub async fn publish_lease_revocation(
1301 &self,
1302 revocation: LeaseRevocationContent,
1303 ) -> Result<String> {
1304 let content = serde_json::to_string(&revocation)?;
1305 let d_tag = format!(
1306 "paygress:revocation:v{}:{}:{}",
1307 revocation.version, revocation.primary_provider_npub, revocation.workload_id
1308 );
1309 let v_tag = revocation.version.to_string();
1310 let workload_id_str = revocation.workload_id.to_string();
1311
1312 let mut tags = vec![
1313 Tag::hashtag("paygress"),
1314 Tag::hashtag("paygress-revocation"),
1315 Tag::parse(["d", d_tag.as_str()])?,
1316 Tag::parse(["v", v_tag.as_str()])?,
1317 Tag::parse(["workload", workload_id_str.as_str()])?,
1318 ];
1319 for standby_npub in &revocation.standby_providers {
1320 if let Ok(pk) = nostr_sdk::PublicKey::parse(standby_npub) {
1321 tags.push(Tag::public_key(pk));
1322 } else {
1323 warn!(
1324 "Skipping unparseable standby npub in revocation: {}",
1325 standby_npub
1326 );
1327 }
1328 }
1329
1330 let event = EventBuilder::new(Kind::Custom(KIND_LEASE_REVOCATION), content)
1331 .tags(tags)
1332 .sign_with_keys(&self.keys)?;
1333 let event_id = event.id.to_hex();
1334
1335 match self.client.send_event(&event).await {
1336 Ok(out) => {
1337 info!(
1338 "📜 Lease revocation published for workload {}: {} accepted by {} relay(s)",
1339 revocation.workload_id,
1340 event_id,
1341 out.success.len()
1342 );
1343 Ok(event_id)
1344 }
1345 Err(e) => {
1346 error!("Failed to publish lease revocation: {}", e);
1347 Err(e.into())
1348 }
1349 }
1350 }
1351
1352 pub async fn publish_standby_promotion_announcement(
1357 &self,
1358 announcement: StandbyPromotionAnnouncementContent,
1359 ) -> Result<String> {
1360 let content = serde_json::to_string(&announcement)?;
1361 let d_tag = format!(
1362 "paygress:promoted:v{}:{}",
1363 announcement.version, announcement.workload_id
1364 );
1365 let v_tag = announcement.version.to_string();
1366 let tags = vec![
1367 Tag::hashtag("paygress"),
1368 Tag::hashtag("paygress-promoted"),
1369 Tag::parse(["d", d_tag.as_str()])?,
1370 Tag::parse(["v", v_tag.as_str()])?,
1371 Tag::parse(["workload", announcement.workload_id.as_str()])?,
1372 ];
1373
1374 let event = EventBuilder::new(Kind::Custom(KIND_STANDBY_PROMOTION_ANNOUNCEMENT), content)
1375 .tags(tags)
1376 .sign_with_keys(&self.keys)?;
1377 let event_id = event.id.to_hex();
1378
1379 match self.client.send_event(&event).await {
1380 Ok(out) => {
1381 info!(
1382 "📢 Standby promotion announcement published for workload {}: {} accepted by {} relay(s)",
1383 announcement.workload_id,
1384 event_id,
1385 out.success.len()
1386 );
1387 Ok(event_id)
1388 }
1389 Err(e) => {
1390 error!("Failed to publish standby promotion announcement: {}", e);
1391 Err(e.into())
1392 }
1393 }
1394 }
1395
1396 pub async fn query_standby_promotion_announcements(
1403 &self,
1404 workload_id: &str,
1405 peer_npubs: &[String],
1406 ) -> Result<Option<StandbyPromotionAnnouncementContent>> {
1407 if peer_npubs.is_empty() {
1408 return Ok(None);
1409 }
1410 let mut authors = Vec::new();
1411 for npub in peer_npubs {
1412 if let Ok(pk) = nostr_sdk::PublicKey::parse(npub) {
1413 authors.push(pk);
1414 }
1415 }
1416 if authors.is_empty() {
1417 return Ok(None);
1418 }
1419
1420 let filter = Filter::new()
1421 .kind(Kind::Custom(KIND_STANDBY_PROMOTION_ANNOUNCEMENT))
1422 .authors(authors)
1423 .custom_tag(
1424 nostr_sdk::SingleLetterTag::lowercase(nostr_sdk::Alphabet::D),
1425 format!("paygress:promoted:v{}:{}", SCHEMA_VERSION, workload_id),
1426 );
1427
1428 let events = self
1429 .client
1430 .fetch_events(filter, std::time::Duration::from_secs(5))
1431 .await?;
1432
1433 for event in events.iter() {
1434 if let Ok(content) =
1435 serde_json::from_str::<StandbyPromotionAnnouncementContent>(&event.content)
1436 {
1437 if content.workload_id == workload_id {
1438 return Ok(Some(content));
1439 }
1440 }
1441 }
1442 Ok(None)
1443 }
1444
1445 pub async fn query_providers(&self) -> Result<Vec<ProviderOfferContent>> {
1447 let filter = Filter::new()
1448 .kind(Kind::Custom(KIND_PROVIDER_OFFER))
1449 .hashtag("paygress");
1450
1451 let events = self
1452 .client
1453 .fetch_events(filter, std::time::Duration::from_secs(5))
1454 .await?;
1455
1456 let mut providers = Vec::new();
1457 for event in events {
1458 match serde_json::from_str::<ProviderOfferContent>(&event.content) {
1459 Ok(offer) => providers.push(offer),
1460 Err(e) => {
1461 warn!("Failed to parse provider offer {}: {}", event.id, e);
1462 }
1463 }
1464 }
1465
1466 info!("Found {} providers", providers.len());
1467 Ok(providers)
1468 }
1469
1470 pub async fn query_heartbeats(
1472 &self,
1473 provider_npub: &str,
1474 since_secs: u64,
1475 ) -> Result<Vec<HeartbeatContent>> {
1476 let provider_pubkey = nostr_sdk::PublicKey::parse(provider_npub)?;
1477
1478 let filter = Filter::new()
1479 .kind(Kind::Custom(KIND_PROVIDER_HEARTBEAT))
1480 .author(provider_pubkey)
1481 .since(Timestamp::from(since_secs));
1482
1483 let events = self
1484 .client
1485 .fetch_events(filter, std::time::Duration::from_secs(5))
1486 .await?;
1487
1488 let mut heartbeats = Vec::new();
1489 for event in events {
1490 match serde_json::from_str::<HeartbeatContent>(&event.content) {
1491 Ok(hb) => heartbeats.push(hb),
1492 Err(e) => {
1493 warn!("Failed to parse heartbeat {}: {}", event.id, e);
1494 }
1495 }
1496 }
1497
1498 Ok(heartbeats)
1499 }
1500
1501 pub async fn get_latest_heartbeat(
1508 &self,
1509 provider_npub: &str,
1510 ) -> Result<Option<HeartbeatContent>> {
1511 let provider_pubkey = nostr_sdk::PublicKey::parse(provider_npub)?;
1512
1513 let live_since = std::time::SystemTime::now()
1514 .duration_since(std::time::UNIX_EPOCH)?
1515 .as_secs()
1516 - LIVE_HEARTBEAT_WINDOW_SECS;
1517
1518 let filter = Filter::new()
1519 .kind(Kind::Custom(KIND_PROVIDER_HEARTBEAT))
1520 .author(provider_pubkey)
1521 .since(Timestamp::from(live_since))
1522 .limit(1);
1523
1524 let events = self
1525 .client
1526 .fetch_events(filter, std::time::Duration::from_secs(3))
1527 .await?;
1528
1529 if let Some(event) = events.first() {
1530 match serde_json::from_str::<HeartbeatContent>(&event.content) {
1531 Ok(hb) => return Ok(Some(hb)),
1532 Err(e) => warn!("Failed to parse heartbeat: {}", e),
1533 }
1534 }
1535
1536 Ok(None)
1537 }
1538
1539 pub async fn get_latest_heartbeats_multi(
1541 &self,
1542 provider_npubs: Vec<String>,
1543 ) -> Result<std::collections::HashMap<String, HeartbeatContent>> {
1544 if provider_npubs.is_empty() {
1545 return Ok(std::collections::HashMap::new());
1546 }
1547
1548 let mut pubkeys = Vec::new();
1549 for npub in provider_npubs {
1550 if let Ok(pk) = nostr_sdk::PublicKey::parse(&npub) {
1551 pubkeys.push(pk);
1552 }
1553 }
1554
1555 let live_since = std::time::SystemTime::now()
1556 .duration_since(std::time::UNIX_EPOCH)?
1557 .as_secs()
1558 - LIVE_HEARTBEAT_WINDOW_SECS;
1559
1560 let filter = Filter::new()
1565 .kind(Kind::Custom(KIND_PROVIDER_HEARTBEAT))
1566 .authors(pubkeys)
1567 .since(Timestamp::from(live_since));
1568
1569 let events = self
1571 .client
1572 .fetch_events(filter, std::time::Duration::from_secs(3))
1573 .await?;
1574
1575 let mut heartbeats = std::collections::HashMap::new();
1576
1577 for event in events {
1579 if let Ok(hb) = serde_json::from_str::<HeartbeatContent>(&event.content) {
1580 match heartbeats.entry(hb.provider_npub.clone()) {
1581 std::collections::hash_map::Entry::Occupied(mut entry) => {
1582 let existing: &HeartbeatContent = entry.get();
1583 if hb.timestamp > existing.timestamp {
1584 entry.insert(hb);
1585 }
1586 }
1587 std::collections::hash_map::Entry::Vacant(entry) => {
1588 entry.insert(hb);
1589 }
1590 }
1591 }
1592 }
1593
1594 Ok(heartbeats)
1595 }
1596
1597 pub async fn calculate_uptime(&self, provider_npub: &str, days: u32) -> Result<f32> {
1603 let now = std::time::SystemTime::now()
1604 .duration_since(std::time::UNIX_EPOCH)?
1605 .as_secs();
1606 let since = now - (days as u64 * 24 * 60 * 60);
1607
1608 let heartbeats = self.query_heartbeats(provider_npub, since).await?;
1609
1610 if heartbeats.is_empty() {
1611 return Ok(0.0);
1612 }
1613
1614 let expected = (days as f32) * 24.0 * 3600.0 / HEARTBEAT_BUCKET_SECS as f32;
1618 let actual = heartbeats.len() as f32;
1619
1620 Ok((actual / expected * 100.0).min(100.0))
1621 }
1622}
1623
1624#[derive(Debug, Clone, Serialize, Deserialize)]
1625pub struct StatusRequestContent {
1626 pub pod_id: String, }
1628
1629#[derive(Debug, Clone, Serialize, Deserialize)]
1630pub struct StatusResponseContent {
1631 pub pod_id: String,
1632 pub status: String,
1633 pub expires_at: String,
1634 pub time_remaining_seconds: u64,
1635 pub cpu_millicores: u64,
1636 pub memory_mb: u64,
1637 pub ssh_host: String,
1638 pub ssh_port: u16,
1639 pub ssh_username: String,
1640}
1641
1642#[cfg(test)]
1643mod isolation_level_tests {
1644 use super::IsolationLevel;
1645
1646 #[test]
1647 fn rank_orders_isolation_strength() {
1648 assert!(IsolationLevel::SharedKernel.rank() < IsolationLevel::DedicatedHost.rank());
1649 assert!(IsolationLevel::DedicatedHost.rank() < IsolationLevel::AttestedResearchTier.rank());
1650 }
1651
1652 #[test]
1653 fn meets_accepts_equal_or_stricter_tiers() {
1654 assert!(IsolationLevel::SharedKernel.meets(IsolationLevel::SharedKernel));
1656 assert!(IsolationLevel::DedicatedHost.meets(IsolationLevel::SharedKernel));
1657 assert!(IsolationLevel::AttestedResearchTier.meets(IsolationLevel::SharedKernel));
1658 assert!(!IsolationLevel::SharedKernel.meets(IsolationLevel::DedicatedHost));
1660 assert!(IsolationLevel::DedicatedHost.meets(IsolationLevel::DedicatedHost));
1661 assert!(IsolationLevel::AttestedResearchTier.meets(IsolationLevel::DedicatedHost));
1662 assert!(!IsolationLevel::SharedKernel.meets(IsolationLevel::AttestedResearchTier));
1664 assert!(!IsolationLevel::DedicatedHost.meets(IsolationLevel::AttestedResearchTier));
1665 assert!(IsolationLevel::AttestedResearchTier.meets(IsolationLevel::AttestedResearchTier));
1666 }
1667
1668 #[test]
1669 fn slug_round_trips() {
1670 for level in [
1671 IsolationLevel::SharedKernel,
1672 IsolationLevel::DedicatedHost,
1673 IsolationLevel::AttestedResearchTier,
1674 ] {
1675 assert_eq!(IsolationLevel::from_slug(level.slug()), Some(level));
1676 }
1677 }
1678
1679 #[test]
1680 fn from_slug_rejects_unknown() {
1681 assert!(IsolationLevel::from_slug("paranoid-mode").is_none());
1682 assert!(IsolationLevel::from_slug("").is_none());
1683 assert!(IsolationLevel::from_slug("dedicated_host").is_none());
1685 }
1686}
1687
1688#[cfg(test)]
1689mod npubs_equal_tests {
1690 use super::*;
1691
1692 const PUBKEY_BECH32: &str = "npub1ae40uj62de87f8tvx56e6ytp5m7jd7l96mh0ew43e8q5wucm7z9q2uqvuc";
1699 const PUBKEY_HEX: &str = "ee6afe4b4a6e4fe49d6c35359d1161a6fd26fbe5d6eefcbab1c9c147731bf08a";
1700
1701 #[test]
1702 fn bech32_matches_itself() {
1703 assert!(npubs_equal(PUBKEY_BECH32, PUBKEY_BECH32));
1704 }
1705
1706 #[test]
1707 fn hex_matches_itself() {
1708 assert!(npubs_equal(PUBKEY_HEX, PUBKEY_HEX));
1709 }
1710
1711 #[test]
1717 fn bech32_matches_hex_for_same_key() {
1718 assert!(npubs_equal(PUBKEY_BECH32, PUBKEY_HEX));
1719 assert!(npubs_equal(PUBKEY_HEX, PUBKEY_BECH32));
1720 }
1721
1722 #[test]
1723 fn different_keys_in_different_encodings_do_not_match() {
1724 let other_bech32 = "npub1hyr9m7zeegr98w4e07gvdpqrk25jfp3vku8029u8pcxsc48dq6nqxtwztv";
1726 assert!(!npubs_equal(PUBKEY_HEX, other_bech32));
1727 }
1728
1729 #[test]
1730 fn unparseable_strings_fall_back_to_string_equality() {
1731 assert!(npubs_equal("npub1primary", "npub1primary"));
1735 assert!(!npubs_equal("npub1primary", "npub1secondary"));
1736 }
1737
1738 #[test]
1739 fn one_real_one_typoed_returns_false() {
1740 assert!(!npubs_equal(PUBKEY_BECH32, "npub1primary"));
1743 assert!(!npubs_equal("npub1primary", PUBKEY_HEX));
1744 }
1745}