use anyhow::{Context, Result};
use nostr_sdk::nips::nip04;
use nostr_sdk::nips::nip59::UnwrappedGift;
use nostr_sdk::{
Client, EventBuilder, Filter, Keys, Kind, RelayPoolNotification, Tag, Timestamp, ToBech32,
};
use serde::{Deserialize, Serialize};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{debug, error, info, warn};
pub const KIND_PROVIDER_OFFER: u16 = 38383;
pub const KIND_PROVIDER_HEARTBEAT: u16 = 38384;
pub const KIND_PROVIDER_HEARTBEAT_EPHEMERAL: u16 = 20384;
pub const KIND_LEASE_REVOCATION: u16 = 38385;
pub const KIND_STANDBY_PROMOTION_ANNOUNCEMENT: u16 = 38386;
pub const SCHEMA_VERSION: u8 = 1;
pub const LIVE_HEARTBEAT_WINDOW_SECS: u64 = 300;
pub const HEARTBEAT_BUCKET_SECS: u64 = 60;
#[derive(Clone, Debug)]
pub struct RelayConfig {
pub relays: Vec<String>,
pub private_key: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct NostrEvent {
pub id: String,
pub pubkey: String,
pub created_at: u64,
pub kind: u32,
pub tags: Vec<Vec<String>>,
pub content: String,
pub sig: String,
pub message_type: String, }
#[derive(Clone)]
pub struct NostrRelaySubscriber {
client: Client,
keys: Keys,
}
impl NostrRelaySubscriber {
pub async fn new(config: RelayConfig) -> Result<Self> {
let keys = match &config.private_key {
Some(private_key_hex) if !private_key_hex.is_empty() => {
if private_key_hex.starts_with("nsec1") {
Keys::parse(private_key_hex).context("Invalid nsec private key format")?
} else {
Keys::parse(private_key_hex).context("Invalid private key format")?
}
}
_ => {
Keys::generate()
}
};
let client = Client::new(keys.clone());
for relay_url in &config.relays {
info!("Adding relay: {}", relay_url);
client
.add_relay(relay_url)
.await
.with_context(|| format!("Invalid relay URL: {}", relay_url))?;
}
info!("Connecting to {} relays...", config.relays.len());
client.connect().await;
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
info!("Connected to {} relays", config.relays.len());
info!(
"Service public key (npub): {}",
keys.public_key().to_bech32().unwrap()
);
Ok(Self { client, keys })
}
pub fn public_key(&self) -> nostr_sdk::PublicKey {
self.keys.public_key()
}
pub async fn subscribe_to_pod_events<F>(&self, handler: F) -> Result<()>
where
F: Fn(NostrEvent) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>
+ Send
+ Sync
+ 'static,
{
let nip04_filter = Filter::new()
.kind(Kind::EncryptedDirectMessage)
.pubkeys(vec![self.keys.public_key()]) .limit(0);
let nip17_filter = Filter::new()
.kind(Kind::GiftWrap)
.pubkeys(vec![self.keys.public_key()]) .limit(0);
let revocation_filter = Filter::new()
.kind(Kind::Custom(KIND_LEASE_REVOCATION))
.pubkeys(vec![self.keys.public_key()])
.limit(0);
let _ = self.client.subscribe(nip04_filter, None).await;
let _ = self.client.subscribe(nip17_filter, None).await;
let _ = self.client.subscribe(revocation_filter, None).await;
info!("Subscribed to NIP-04 / NIP-17 messages and KIND_LEASE_REVOCATION events addressed to this provider");
self.client.handle_notifications(|notification| async {
if let RelayPoolNotification::Event { relay_url: _, subscription_id: _, event } = notification {
match event.kind {
Kind::GiftWrap => {
info!("Received NIP-17 Gift Wrap message: {}", event.id);
match self.client.unwrap_gift_wrap(&event).await {
Ok(UnwrappedGift { rumor, sender }) => {
info!("Unwrapped Gift Wrap from sender: {}, rumor kind: {}", sender, rumor.kind);
if rumor.kind == Kind::PrivateDirectMessage {
debug!("NIP-17 rumor is PrivateDirectMessage. Content length: {}", rumor.content.len());
let nostr_event = NostrEvent {
id: rumor.id.map(|id| id.to_hex()).unwrap_or_else(|| "unknown".to_string()),
pubkey: rumor.pubkey.to_hex(),
created_at: rumor.created_at.as_u64(),
kind: rumor.kind.as_u16() as u32,
tags: rumor.tags.iter().map(|tag| {
tag.as_slice().iter().map(|s| s.to_string()).collect()
}).collect(),
content: rumor.content,
sig: "unsigned".to_string(), message_type: "nip17".to_string(), };
match handler(nostr_event).await {
Ok(()) => {
info!("Successfully processed NIP-17 private message: {}", event.id);
}
Err(e) => {
error!("Failed to process NIP-17 private message {}: {}", event.id, e);
}
}
} else {
info!("Rumor is not a private direct message, kind: {}", rumor.kind);
}
}
Err(e) => {
error!("Failed to unwrap Gift Wrap {}: {}", event.id, e);
}
}
}
Kind::EncryptedDirectMessage => {
info!("Received NIP-04 Encrypted Direct Message: {}", event.id);
let secret_key = self.keys.secret_key();
match nip04::decrypt(secret_key, &event.pubkey, &event.content) {
Ok(decrypted_content) => {
debug!(
"Decrypted NIP-04 message. Length: {}",
decrypted_content.len()
);
let nostr_event = NostrEvent {
id: event.id.to_hex(),
pubkey: event.pubkey.to_hex(),
created_at: event.created_at.as_u64(),
kind: event.kind.as_u16() as u32,
tags: event
.tags
.iter()
.map(|tag| {
tag.as_slice()
.iter()
.map(|s| s.to_string())
.collect()
})
.collect(),
content: decrypted_content,
sig: event.sig.to_string(),
message_type: "nip04".to_string(),
};
match handler(nostr_event).await {
Ok(()) => info!(
"Successfully processed NIP-04 private message: {}",
event.id
),
Err(e) => error!(
"Failed to process NIP-04 private message {}: {}",
event.id, e
),
}
}
Err(e) => {
error!(
"Failed to decrypt NIP-04 message {}: {}",
event.id, e
);
}
}
}
Kind::Custom(k) if k == KIND_LEASE_REVOCATION => {
info!("Received lease revocation event: {}", event.id);
let nostr_event = NostrEvent {
id: event.id.to_hex(),
pubkey: event.pubkey.to_hex(),
created_at: event.created_at.as_u64(),
kind: event.kind.as_u16() as u32,
tags: event
.tags
.iter()
.map(|tag| {
tag.as_slice().iter().map(|s| s.to_string()).collect()
})
.collect(),
content: event.content.clone(),
sig: event.sig.to_string(),
message_type: "lease_revocation".to_string(),
};
if let Err(e) = handler(nostr_event).await {
error!("Failed to process lease revocation {}: {}", event.id, e);
}
}
_ => {
info!("Received unsupported event kind: {}", event.kind);
}
}
}
Ok(false) }).await?;
Ok(())
}
pub async fn publish_offer(&self, offer: OfferEventContent) -> Result<String> {
let content = serde_json::to_string(&offer)?;
info!("Publishing offer event with content: {}", content);
let tags = vec![Tag::hashtag("paygress"), Tag::hashtag("offer")];
info!("Creating event with kind 999 and {} tags", tags.len());
let event = EventBuilder::new(Kind::Custom(999), content)
.tags(tags)
.sign_with_keys(&self.keys)?;
let event_id = event.id.to_hex();
info!("Event created with ID: {}", event_id);
info!("Sending offer event to relays: {}", event_id);
match self.client.send_event(&event).await {
Ok(res) => {
info!(
"✅ Successfully published offer event: {} and {:?}",
event_id, res
);
Ok(event_id)
}
Err(e) => {
error!("❌ Failed to send offer event: {}", e);
Err(e.into())
}
}
}
pub async fn send_encrypted_private_message(
&self,
receiver_pubkey: &str,
content: String,
message_type: &str,
) -> Result<String> {
let receiver_pubkey_parsed = nostr_sdk::PublicKey::parse(receiver_pubkey)?;
match message_type {
"nip04" => {
let secret_key = self.keys.secret_key();
let encrypted_content =
nip04::encrypt(secret_key, &receiver_pubkey_parsed, &content)?;
let receiver_tag = Tag::public_key(receiver_pubkey_parsed);
let alt_tag = Tag::parse(["alt", "Private Message"])?;
let event = EventBuilder::new(Kind::EncryptedDirectMessage, encrypted_content)
.tags([receiver_tag, alt_tag])
.sign_with_keys(&self.keys)?;
let event_id = self.client.send_event(&event).await?;
info!("Sent NIP-04 message to {}: {:?}", receiver_pubkey, event_id);
Ok(event_id.val.to_hex())
}
"nip17" | _ => {
let event_id = self
.client
.send_private_msg(receiver_pubkey_parsed, content, [])
.await?;
info!("Sent NIP-17 message to {}: {:?}", receiver_pubkey, event_id);
Ok(event_id.val.to_hex())
}
}
}
pub async fn send_access_details_private_message(
&self,
request_pubkey: &str,
details: AccessDetailsContent,
message_type: &str,
) -> Result<String> {
let details_json = serde_json::to_string(&details)?;
self.send_encrypted_private_message(request_pubkey, details_json, message_type)
.await
}
pub async fn send_status_response(
&self,
request_pubkey: &str,
response: StatusResponseContent,
message_type: &str,
) -> Result<String> {
let response_json = serde_json::to_string(&response)?;
self.send_encrypted_private_message(request_pubkey, response_json, message_type)
.await
}
pub async fn send_error_response(
&self,
request_pubkey: &str,
error_type: &str,
message: &str,
details: Option<&str>,
message_type: &str,
) -> Result<String> {
let error = ErrorResponseContent {
error_type: error_type.to_string(),
message: message.to_string(),
details: details.map(|s| s.to_string()),
};
self.send_error_response_private_message(request_pubkey, error, message_type)
.await
}
pub async fn send_error_response_private_message(
&self,
request_pubkey: &str,
error: ErrorResponseContent,
message_type: &str,
) -> Result<String> {
let error_json = serde_json::to_string(&error)?;
self.send_encrypted_private_message(request_pubkey, error_json, message_type)
.await
}
pub async fn send_topup_response_private_message(
&self,
request_pubkey: &str,
response: TopUpResponseContent,
message_type: &str,
) -> Result<String> {
let response_json = serde_json::to_string(&response)?;
self.send_encrypted_private_message(request_pubkey, response_json, message_type)
.await
}
pub fn client(&self) -> &Client {
&self.client
}
pub fn get_service_public_key(&self) -> String {
self.keys.public_key().to_hex()
}
#[allow(dead_code)]
fn convert_event(&self, event: &nostr_sdk::Event) -> NostrEvent {
NostrEvent {
id: event.id.to_hex(),
pubkey: event.pubkey.to_hex(),
created_at: event.created_at.as_u64(),
kind: event.kind.as_u16() as u32,
tags: event
.tags
.iter()
.map(|tag| tag.as_slice().iter().map(|s| s.to_string()).collect())
.collect(),
content: event.content.clone(),
sig: event.sig.to_string(),
message_type: "unknown".to_string(),
}
}
pub async fn wait_for_decrypted_message(
&self,
sender_pubkey: &str,
timeout_secs: u64,
) -> Result<NostrEvent> {
let sender_pk = nostr_sdk::PublicKey::parse(sender_pubkey)?;
let receiver_pk = self.keys.public_key();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let tx = Arc::new(Mutex::new(Some(tx)));
let client = self.client.clone();
let receiver_keys = self.keys.clone();
let timeout = tokio::time::Duration::from_secs(timeout_secs);
let subscribe_since = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
.saturating_sub(60);
let filter = Filter::new()
.pubkeys(vec![receiver_pk])
.kinds(vec![Kind::EncryptedDirectMessage, Kind::GiftWrap])
.since(nostr_sdk::Timestamp::from_secs(subscribe_since));
let _ = client.subscribe(filter, None).await;
let result = tokio::select! {
notification_res = client.handle_notifications(|notification| {
let tx = tx.clone();
let receiver_keys = receiver_keys.clone();
let sender_pk = sender_pk.clone();
let client = client.clone();
async move {
if let RelayPoolNotification::Event { event, .. } = notification {
let mut event_to_send = None;
match event.kind {
Kind::GiftWrap => {
if let Ok(UnwrappedGift { rumor, sender }) = client.unwrap_gift_wrap(&event).await {
if sender == sender_pk && rumor.kind == Kind::PrivateDirectMessage {
event_to_send = Some(NostrEvent {
id: rumor.id.map(|id| id.to_hex()).unwrap_or_default(),
pubkey: sender.to_hex(),
created_at: rumor.created_at.as_u64(),
kind: rumor.kind.as_u16() as u32,
tags: rumor.tags.iter().map(|tag| tag.as_slice().iter().map(|s| s.to_string()).collect()).collect(),
content: rumor.content,
sig: String::new(),
message_type: "nip17".to_string(),
});
}
}
}
Kind::EncryptedDirectMessage => {
if event.pubkey == sender_pk {
let secret_key = receiver_keys.secret_key();
if let Ok(content) = nip04::decrypt(secret_key, &event.pubkey, &event.content) {
event_to_send = Some(NostrEvent {
id: event.id.to_hex(),
pubkey: event.pubkey.to_hex(),
created_at: event.created_at.as_u64(),
kind: event.kind.as_u16() as u32,
tags: event.tags.iter().map(|tag| tag.as_slice().iter().map(|s| s.to_string()).collect()).collect(),
content,
sig: event.sig.to_string(),
message_type: "nip04".to_string(),
});
}
}
}
_ => {}
}
if let Some(ev) = event_to_send {
let mut lock = tx.lock().await;
if let Some(sender) = lock.take() {
let _ = sender.send(ev).await;
return Ok(true); }
}
}
Ok(false)
}
}) => {
match notification_res {
Ok(_) => rx.recv().await.ok_or_else(|| anyhow::anyhow!("Channel closed")),
Err(e) => Err(anyhow::anyhow!("Notification handler error: {}", e)),
}
}
_ = tokio::time::sleep(timeout) => {
Err(anyhow::anyhow!("Timeout waiting for response from {}", sender_pubkey))
}
};
result
}
}
pub fn default_relay_config() -> RelayConfig {
RelayConfig {
relays: vec![
"wss://relay.damus.io".to_string(),
"wss://nos.lol".to_string(),
"wss://relay.nostr.band".to_string(),
],
private_key: None,
}
}
pub fn custom_relay_config(relays: Vec<String>, private_key: Option<String>) -> RelayConfig {
RelayConfig {
relays,
private_key,
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PodSpec {
pub id: String, pub name: String, pub description: String, pub cpu_millicores: u64, pub memory_mb: u64, pub rate_msats_per_sec: u64, }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OfferEventContent {
pub minimum_duration_seconds: u64,
pub whitelisted_mints: Vec<String>,
pub pod_specs: Vec<PodSpec>, }
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TemplateAccessPort {
pub host_port: u16,
pub container_port: u16,
pub protocol: String,
pub label: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AccessDetailsContent {
pub pod_npub: String, pub node_port: u16, pub expires_at: String, pub cpu_millicores: u64, pub memory_mb: u64, pub pod_spec_name: String, pub pod_spec_description: String, pub instructions: Vec<String>,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub host_address: String,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub template_ports: Vec<TemplateAccessPort>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ErrorResponseContent {
pub error_type: String, pub message: String, pub details: Option<String>, }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopUpResponseContent {
pub success: bool,
pub pod_npub: String,
pub extended_duration_seconds: u64,
pub new_expires_at: String,
pub message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EncryptedSpawnPodRequest {
pub cashu_token: String,
pub pod_spec_id: Option<String>, pub pod_image: String, pub ssh_username: String,
pub ssh_password: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub template_slug: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub replication: Option<crate::durable_workload::ReplicationMode>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub primary_npub: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub workload_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub volume_encryption: Option<VolumeEncryption>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct VolumeEncryption {
#[serde(default = "volume_encryption_default_version")]
pub version: u8,
pub algorithm: String,
pub key_b64: String,
}
fn volume_encryption_default_version() -> u8 {
1
}
impl VolumeEncryption {
pub const ALGORITHM_V1: &'static str = "luks2-aes-xts";
pub const VERSION_V1: u8 = 1;
pub fn v1(key: [u8; 32]) -> Self {
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
Self {
version: Self::VERSION_V1,
algorithm: Self::ALGORITHM_V1.to_string(),
key_b64: URL_SAFE_NO_PAD.encode(key),
}
}
pub fn decoded_key(&self) -> Result<[u8; 32], anyhow::Error> {
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
let bytes = URL_SAFE_NO_PAD
.decode(self.key_b64.as_bytes())
.map_err(|e| anyhow::anyhow!("volume_encryption.key_b64 invalid base64: {}", e))?;
if bytes.len() != 32 {
anyhow::bail!(
"volume_encryption.key_b64 decoded to {} bytes, expected 32",
bytes.len()
);
}
let mut out = [0u8; 32];
out.copy_from_slice(&bytes);
Ok(out)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WarmStandbyRole {
Primary,
Standby { index: usize, count: usize },
NotAddressed,
}
pub fn warm_standby_role(
self_npub: &str,
primary_npub: &str,
standby_providers: &[String],
) -> WarmStandbyRole {
if npubs_equal(self_npub, primary_npub) {
return WarmStandbyRole::Primary;
}
for (idx, p) in standby_providers.iter().enumerate() {
if npubs_equal(self_npub, p) {
return WarmStandbyRole::Standby {
index: idx,
count: standby_providers.len(),
};
}
}
WarmStandbyRole::NotAddressed
}
pub fn npubs_equal(a: &str, b: &str) -> bool {
let pa = nostr_sdk::PublicKey::parse(a);
let pb = nostr_sdk::PublicKey::parse(b);
match (pa, pb) {
(Ok(ka), Ok(kb)) => ka == kb,
(Ok(_), Err(_)) | (Err(_), Ok(_)) => false,
(Err(_), Err(_)) => a == b,
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EncryptedTopUpPodRequest {
pub pod_npub: String, pub cashu_token: String,
}
pub async fn send_provisioning_request_private_message(
client: &Client,
service_pubkey: &str,
request: EncryptedSpawnPodRequest,
) -> Result<String> {
let request_json = serde_json::to_string(&request)?;
let service_pubkey_parsed = nostr_sdk::PublicKey::parse(service_pubkey)?;
let event_id = client
.send_private_msg(service_pubkey_parsed, request_json, [])
.await?;
Ok(event_id.val.to_hex())
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum PrivateRequest {
Spawn(EncryptedSpawnPodRequest),
TopUp(EncryptedTopUpPodRequest),
Status(StatusRequestContent),
}
pub fn parse_private_message_content(content: &str) -> Result<PrivateRequest> {
match serde_json::from_str::<PrivateRequest>(content) {
Ok(request) => Ok(request),
Err(e) => {
let truncated_content = if content.len() > 100 {
format!("{}...", &content[..100])
} else {
content.to_string()
};
Err(anyhow::anyhow!(
"JSON parsing failed: {}. Content: '{}'",
e,
truncated_content
))
}
}
}
pub fn parse_revocation_event(event: &NostrEvent) -> Option<LeaseRevocationContent> {
if event.kind != KIND_LEASE_REVOCATION as u32 {
return None;
}
serde_json::from_str::<LeaseRevocationContent>(&event.content).ok()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CapacityInfo {
pub cpu_available: u64, pub memory_mb_available: u64, pub storage_gb_available: u64, }
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "kebab-case")]
pub enum IsolationLevel {
#[default]
SharedKernel,
DedicatedHost,
AttestedResearchTier,
}
impl IsolationLevel {
pub fn rank(self) -> u8 {
match self {
Self::SharedKernel => 0,
Self::DedicatedHost => 1,
Self::AttestedResearchTier => 2,
}
}
pub fn meets(self, min: IsolationLevel) -> bool {
self.rank() >= min.rank()
}
pub fn from_slug(s: &str) -> Option<Self> {
match s {
"shared-kernel" => Some(Self::SharedKernel),
"dedicated-host" => Some(Self::DedicatedHost),
"attested-research-tier" => Some(Self::AttestedResearchTier),
_ => None,
}
}
pub fn slug(self) -> &'static str {
match self {
Self::SharedKernel => "shared-kernel",
Self::DedicatedHost => "dedicated-host",
Self::AttestedResearchTier => "attested-research-tier",
}
}
}
fn default_schema_version() -> u8 {
SCHEMA_VERSION
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProviderOfferContent {
pub provider_npub: String,
pub hostname: String,
pub location: Option<String>,
pub capabilities: Vec<String>, pub specs: Vec<PodSpec>,
pub whitelisted_mints: Vec<String>,
pub uptime_percent: f32,
pub total_jobs_completed: u64,
pub api_endpoint: Option<String>,
#[serde(default = "default_schema_version")]
pub version: u8,
#[serde(default)]
pub isolation_level: IsolationLevel,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub stake_proof: Option<crate::stake::StakeProof>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HeartbeatContent {
pub provider_npub: String,
pub timestamp: u64,
pub active_workloads: u32,
pub available_capacity: CapacityInfo,
#[serde(default = "default_schema_version")]
pub version: u8,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LeaseRevocationContent {
pub workload_id: String,
pub primary_provider_npub: String,
pub standby_providers: Vec<String>,
pub reason: String,
pub revoked_at: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub state_uri: Option<String>,
#[serde(default = "default_schema_version")]
pub version: u8,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StandbyPromotionAnnouncementContent {
pub workload_id: String,
pub new_primary_npub: String,
pub promoted_at: u64,
#[serde(default = "default_schema_version")]
pub version: u8,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProviderInfo {
pub npub: String,
pub hostname: String,
pub location: Option<String>,
pub capabilities: Vec<String>,
pub specs: Vec<PodSpec>,
pub whitelisted_mints: Vec<String>,
pub uptime_percent: f32,
pub total_jobs_completed: u64,
pub last_seen: u64, pub is_online: bool,
pub isolation_level: IsolationLevel,
}
#[derive(Debug, Clone, Default)]
pub struct ProviderFilter {
pub capability: Option<String>,
pub min_uptime: Option<f32>,
pub min_memory_mb: Option<u64>,
pub min_cpu: Option<u64>,
pub isolation_level: Option<IsolationLevel>,
}
impl NostrRelaySubscriber {
pub async fn publish_provider_offer(&self, offer: ProviderOfferContent) -> Result<String> {
let content = serde_json::to_string(&offer)?;
info!("Publishing provider offer for {}", offer.hostname);
let d_tag = format!("paygress:offer:v{}:{}", offer.version, offer.provider_npub);
let tags = vec![
Tag::hashtag("paygress"),
Tag::hashtag("compute"),
Tag::parse(["d", d_tag.as_str()])?,
Tag::parse(["v", offer.version.to_string().as_str()])?,
];
let event = EventBuilder::new(Kind::Custom(KIND_PROVIDER_OFFER), content)
.tags(tags)
.sign_with_keys(&self.keys)?;
let event_id = event.id.to_hex();
match self.client.send_event(&event).await {
Ok(res) => {
info!("✅ Published provider offer: {} ({:?})", event_id, res);
Ok(event_id)
}
Err(e) => {
error!("❌ Failed to publish provider offer: {}", e);
Err(e.into())
}
}
}
pub async fn publish_heartbeat(
&self,
heartbeat: HeartbeatContent,
) -> Result<(String, Vec<String>)> {
let content = serde_json::to_string(&heartbeat)?;
let bucket = heartbeat.timestamp / HEARTBEAT_BUCKET_SECS;
let d_tag = format!(
"paygress:heartbeat:v{}:{}:{}",
heartbeat.version, heartbeat.provider_npub, bucket
);
let provider_pk = nostr_sdk::PublicKey::parse(&heartbeat.provider_npub)?;
let v_tag = heartbeat.version.to_string();
let stored_tags = vec![
Tag::hashtag("paygress-heartbeat"),
Tag::public_key(provider_pk),
Tag::parse(["d", d_tag.as_str()])?,
Tag::parse(["v", v_tag.as_str()])?,
];
let stored_event =
EventBuilder::new(Kind::Custom(KIND_PROVIDER_HEARTBEAT), content.clone())
.tags(stored_tags)
.sign_with_keys(&self.keys)?;
let stored_id = stored_event.id.to_hex();
let ephemeral_tags = vec![
Tag::hashtag("paygress-heartbeat"),
Tag::public_key(provider_pk),
Tag::parse(["v", v_tag.as_str()])?,
];
let ephemeral_event =
EventBuilder::new(Kind::Custom(KIND_PROVIDER_HEARTBEAT_EPHEMERAL), content)
.tags(ephemeral_tags)
.sign_with_keys(&self.keys)?;
let mut accepting_relays: Vec<String> = Vec::new();
match self.client.send_event(&stored_event).await {
Ok(out) => {
debug!("📦 Stored heartbeat published: {}", stored_id);
accepting_relays = out.success.iter().map(|u| u.to_string()).collect();
}
Err(e) => warn!("Failed to publish stored heartbeat: {}", e),
}
match self.client.send_event(&ephemeral_event).await {
Ok(_) => debug!("⚡ Ephemeral heartbeat published"),
Err(e) => warn!("Failed to publish ephemeral heartbeat: {}", e),
}
info!(
"💓 Heartbeat published (stored + ephemeral): {} accepted by {} relay(s)",
stored_id,
accepting_relays.len()
);
Ok((stored_id, accepting_relays))
}
pub async fn publish_lease_revocation(
&self,
revocation: LeaseRevocationContent,
) -> Result<String> {
let content = serde_json::to_string(&revocation)?;
let d_tag = format!(
"paygress:revocation:v{}:{}:{}",
revocation.version, revocation.primary_provider_npub, revocation.workload_id
);
let v_tag = revocation.version.to_string();
let workload_id_str = revocation.workload_id.to_string();
let mut tags = vec![
Tag::hashtag("paygress"),
Tag::hashtag("paygress-revocation"),
Tag::parse(["d", d_tag.as_str()])?,
Tag::parse(["v", v_tag.as_str()])?,
Tag::parse(["workload", workload_id_str.as_str()])?,
];
for standby_npub in &revocation.standby_providers {
if let Ok(pk) = nostr_sdk::PublicKey::parse(standby_npub) {
tags.push(Tag::public_key(pk));
} else {
warn!(
"Skipping unparseable standby npub in revocation: {}",
standby_npub
);
}
}
let event = EventBuilder::new(Kind::Custom(KIND_LEASE_REVOCATION), content)
.tags(tags)
.sign_with_keys(&self.keys)?;
let event_id = event.id.to_hex();
match self.client.send_event(&event).await {
Ok(out) => {
info!(
"📜 Lease revocation published for workload {}: {} accepted by {} relay(s)",
revocation.workload_id,
event_id,
out.success.len()
);
Ok(event_id)
}
Err(e) => {
error!("Failed to publish lease revocation: {}", e);
Err(e.into())
}
}
}
pub async fn publish_standby_promotion_announcement(
&self,
announcement: StandbyPromotionAnnouncementContent,
) -> Result<String> {
let content = serde_json::to_string(&announcement)?;
let d_tag = format!(
"paygress:promoted:v{}:{}",
announcement.version, announcement.workload_id
);
let v_tag = announcement.version.to_string();
let tags = vec![
Tag::hashtag("paygress"),
Tag::hashtag("paygress-promoted"),
Tag::parse(["d", d_tag.as_str()])?,
Tag::parse(["v", v_tag.as_str()])?,
Tag::parse(["workload", announcement.workload_id.as_str()])?,
];
let event = EventBuilder::new(Kind::Custom(KIND_STANDBY_PROMOTION_ANNOUNCEMENT), content)
.tags(tags)
.sign_with_keys(&self.keys)?;
let event_id = event.id.to_hex();
match self.client.send_event(&event).await {
Ok(out) => {
info!(
"📢 Standby promotion announcement published for workload {}: {} accepted by {} relay(s)",
announcement.workload_id,
event_id,
out.success.len()
);
Ok(event_id)
}
Err(e) => {
error!("Failed to publish standby promotion announcement: {}", e);
Err(e.into())
}
}
}
pub async fn query_standby_promotion_announcements(
&self,
workload_id: &str,
peer_npubs: &[String],
) -> Result<Option<StandbyPromotionAnnouncementContent>> {
if peer_npubs.is_empty() {
return Ok(None);
}
let mut authors = Vec::new();
for npub in peer_npubs {
if let Ok(pk) = nostr_sdk::PublicKey::parse(npub) {
authors.push(pk);
}
}
if authors.is_empty() {
return Ok(None);
}
let filter = Filter::new()
.kind(Kind::Custom(KIND_STANDBY_PROMOTION_ANNOUNCEMENT))
.authors(authors)
.custom_tag(
nostr_sdk::SingleLetterTag::lowercase(nostr_sdk::Alphabet::D),
format!("paygress:promoted:v{}:{}", SCHEMA_VERSION, workload_id),
);
let events = self
.client
.fetch_events(filter, std::time::Duration::from_secs(5))
.await?;
for event in events.iter() {
if let Ok(content) =
serde_json::from_str::<StandbyPromotionAnnouncementContent>(&event.content)
{
if content.workload_id == workload_id {
return Ok(Some(content));
}
}
}
Ok(None)
}
pub async fn query_providers(&self) -> Result<Vec<ProviderOfferContent>> {
let filter = Filter::new()
.kind(Kind::Custom(KIND_PROVIDER_OFFER))
.hashtag("paygress");
let events = self
.client
.fetch_events(filter, std::time::Duration::from_secs(5))
.await?;
let mut providers = Vec::new();
for event in events {
match serde_json::from_str::<ProviderOfferContent>(&event.content) {
Ok(offer) => providers.push(offer),
Err(e) => {
warn!("Failed to parse provider offer {}: {}", event.id, e);
}
}
}
info!("Found {} providers", providers.len());
Ok(providers)
}
pub async fn query_heartbeats(
&self,
provider_npub: &str,
since_secs: u64,
) -> Result<Vec<HeartbeatContent>> {
let provider_pubkey = nostr_sdk::PublicKey::parse(provider_npub)?;
let filter = Filter::new()
.kind(Kind::Custom(KIND_PROVIDER_HEARTBEAT))
.author(provider_pubkey)
.since(Timestamp::from(since_secs));
let events = self
.client
.fetch_events(filter, std::time::Duration::from_secs(5))
.await?;
let mut heartbeats = Vec::new();
for event in events {
match serde_json::from_str::<HeartbeatContent>(&event.content) {
Ok(hb) => heartbeats.push(hb),
Err(e) => {
warn!("Failed to parse heartbeat {}: {}", event.id, e);
}
}
}
Ok(heartbeats)
}
pub async fn get_latest_heartbeat(
&self,
provider_npub: &str,
) -> Result<Option<HeartbeatContent>> {
let provider_pubkey = nostr_sdk::PublicKey::parse(provider_npub)?;
let live_since = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs()
- LIVE_HEARTBEAT_WINDOW_SECS;
let filter = Filter::new()
.kind(Kind::Custom(KIND_PROVIDER_HEARTBEAT))
.author(provider_pubkey)
.since(Timestamp::from(live_since))
.limit(1);
let events = self
.client
.fetch_events(filter, std::time::Duration::from_secs(3))
.await?;
if let Some(event) = events.first() {
match serde_json::from_str::<HeartbeatContent>(&event.content) {
Ok(hb) => return Ok(Some(hb)),
Err(e) => warn!("Failed to parse heartbeat: {}", e),
}
}
Ok(None)
}
pub async fn get_latest_heartbeats_multi(
&self,
provider_npubs: Vec<String>,
) -> Result<std::collections::HashMap<String, HeartbeatContent>> {
if provider_npubs.is_empty() {
return Ok(std::collections::HashMap::new());
}
let mut pubkeys = Vec::new();
for npub in provider_npubs {
if let Ok(pk) = nostr_sdk::PublicKey::parse(&npub) {
pubkeys.push(pk);
}
}
let live_since = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs()
- LIVE_HEARTBEAT_WINDOW_SECS;
let filter = Filter::new()
.kind(Kind::Custom(KIND_PROVIDER_HEARTBEAT))
.authors(pubkeys)
.since(Timestamp::from(live_since));
let events = self
.client
.fetch_events(filter, std::time::Duration::from_secs(3))
.await?;
let mut heartbeats = std::collections::HashMap::new();
for event in events {
if let Ok(hb) = serde_json::from_str::<HeartbeatContent>(&event.content) {
match heartbeats.entry(hb.provider_npub.clone()) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
let existing: &HeartbeatContent = entry.get();
if hb.timestamp > existing.timestamp {
entry.insert(hb);
}
}
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(hb);
}
}
}
}
Ok(heartbeats)
}
pub async fn calculate_uptime(&self, provider_npub: &str, days: u32) -> Result<f32> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
let since = now - (days as u64 * 24 * 60 * 60);
let heartbeats = self.query_heartbeats(provider_npub, since).await?;
if heartbeats.is_empty() {
return Ok(0.0);
}
let expected = (days as f32) * 24.0 * 3600.0 / HEARTBEAT_BUCKET_SECS as f32;
let actual = heartbeats.len() as f32;
Ok((actual / expected * 100.0).min(100.0))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatusRequestContent {
pub pod_id: String, }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StatusResponseContent {
pub pod_id: String,
pub status: String,
pub expires_at: String,
pub time_remaining_seconds: u64,
pub cpu_millicores: u64,
pub memory_mb: u64,
pub ssh_host: String,
pub ssh_port: u16,
pub ssh_username: String,
}
#[cfg(test)]
mod isolation_level_tests {
use super::IsolationLevel;
#[test]
fn rank_orders_isolation_strength() {
assert!(IsolationLevel::SharedKernel.rank() < IsolationLevel::DedicatedHost.rank());
assert!(IsolationLevel::DedicatedHost.rank() < IsolationLevel::AttestedResearchTier.rank());
}
#[test]
fn meets_accepts_equal_or_stricter_tiers() {
assert!(IsolationLevel::SharedKernel.meets(IsolationLevel::SharedKernel));
assert!(IsolationLevel::DedicatedHost.meets(IsolationLevel::SharedKernel));
assert!(IsolationLevel::AttestedResearchTier.meets(IsolationLevel::SharedKernel));
assert!(!IsolationLevel::SharedKernel.meets(IsolationLevel::DedicatedHost));
assert!(IsolationLevel::DedicatedHost.meets(IsolationLevel::DedicatedHost));
assert!(IsolationLevel::AttestedResearchTier.meets(IsolationLevel::DedicatedHost));
assert!(!IsolationLevel::SharedKernel.meets(IsolationLevel::AttestedResearchTier));
assert!(!IsolationLevel::DedicatedHost.meets(IsolationLevel::AttestedResearchTier));
assert!(IsolationLevel::AttestedResearchTier.meets(IsolationLevel::AttestedResearchTier));
}
#[test]
fn slug_round_trips() {
for level in [
IsolationLevel::SharedKernel,
IsolationLevel::DedicatedHost,
IsolationLevel::AttestedResearchTier,
] {
assert_eq!(IsolationLevel::from_slug(level.slug()), Some(level));
}
}
#[test]
fn from_slug_rejects_unknown() {
assert!(IsolationLevel::from_slug("paranoid-mode").is_none());
assert!(IsolationLevel::from_slug("").is_none());
assert!(IsolationLevel::from_slug("dedicated_host").is_none());
}
}
#[cfg(test)]
mod npubs_equal_tests {
use super::*;
const PUBKEY_BECH32: &str = "npub1ae40uj62de87f8tvx56e6ytp5m7jd7l96mh0ew43e8q5wucm7z9q2uqvuc";
const PUBKEY_HEX: &str = "ee6afe4b4a6e4fe49d6c35359d1161a6fd26fbe5d6eefcbab1c9c147731bf08a";
#[test]
fn bech32_matches_itself() {
assert!(npubs_equal(PUBKEY_BECH32, PUBKEY_BECH32));
}
#[test]
fn hex_matches_itself() {
assert!(npubs_equal(PUBKEY_HEX, PUBKEY_HEX));
}
#[test]
fn bech32_matches_hex_for_same_key() {
assert!(npubs_equal(PUBKEY_BECH32, PUBKEY_HEX));
assert!(npubs_equal(PUBKEY_HEX, PUBKEY_BECH32));
}
#[test]
fn different_keys_in_different_encodings_do_not_match() {
let other_bech32 = "npub1hyr9m7zeegr98w4e07gvdpqrk25jfp3vku8029u8pcxsc48dq6nqxtwztv";
assert!(!npubs_equal(PUBKEY_HEX, other_bech32));
}
#[test]
fn unparseable_strings_fall_back_to_string_equality() {
assert!(npubs_equal("npub1primary", "npub1primary"));
assert!(!npubs_equal("npub1primary", "npub1secondary"));
}
#[test]
fn one_real_one_typoed_returns_false() {
assert!(!npubs_equal(PUBKEY_BECH32, "npub1primary"));
assert!(!npubs_equal("npub1primary", PUBKEY_HEX));
}
}