use hashtree_core::Hash;
use nostr_sdk::nostr::{Event, Kind};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::time::Duration;
use tokio::time::Instant;
use crate::mesh_store_core::RequestDispatchConfig;
use crate::peer_selector::SelectionStrategy;
fn default_hash_get_enabled() -> bool {
true
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct PeerId {
pub pubkey: String,
}
impl PeerId {
pub fn new(pubkey: String) -> Self {
Self { pubkey }
}
pub fn to_peer_string(&self) -> String {
self.pubkey.clone()
}
pub fn from_peer_string(s: &str) -> Option<Self> {
let pubkey = s.trim();
if pubkey.is_empty() || pubkey.contains(':') {
return None;
}
Some(Self {
pubkey: pubkey.to_string(),
})
}
pub fn from_string(s: &str) -> Option<Self> {
Self::from_peer_string(s)
}
pub fn short(&self) -> String {
self.pubkey[..8.min(self.pubkey.len())].to_string()
}
}
impl std::fmt::Display for PeerId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.pubkey)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum SignalingMessage {
#[serde(rename = "hello")]
Hello {
#[serde(rename = "peerId")]
peer_id: String,
roots: Vec<String>,
#[serde(rename = "hashGet", default = "default_hash_get_enabled")]
hash_get: bool,
},
#[serde(rename = "offer")]
Offer {
#[serde(rename = "peerId")]
peer_id: String,
#[serde(rename = "targetPeerId")]
target_peer_id: String,
sdp: String,
},
#[serde(rename = "answer")]
Answer {
#[serde(rename = "peerId")]
peer_id: String,
#[serde(rename = "targetPeerId")]
target_peer_id: String,
sdp: String,
},
#[serde(rename = "candidate")]
Candidate {
#[serde(rename = "peerId")]
peer_id: String,
#[serde(rename = "targetPeerId")]
target_peer_id: String,
candidate: String,
#[serde(rename = "sdpMLineIndex")]
sdp_m_line_index: Option<u16>,
#[serde(rename = "sdpMid")]
sdp_mid: Option<String>,
},
#[serde(rename = "candidates")]
Candidates {
#[serde(rename = "peerId")]
peer_id: String,
#[serde(rename = "targetPeerId")]
target_peer_id: String,
candidates: Vec<IceCandidate>,
},
}
impl SignalingMessage {
pub fn msg_type(&self) -> &str {
match self {
Self::Hello { .. } => "hello",
Self::Offer { .. } => "offer",
Self::Answer { .. } => "answer",
Self::Candidate { .. } => "candidate",
Self::Candidates { .. } => "candidates",
}
}
pub fn peer_id(&self) -> &str {
match self {
Self::Hello { peer_id, .. } => peer_id,
Self::Offer { peer_id, .. } => peer_id,
Self::Answer { peer_id, .. } => peer_id,
Self::Candidate { peer_id, .. } => peer_id,
Self::Candidates { peer_id, .. } => peer_id,
}
}
pub fn target_peer_id(&self) -> Option<&str> {
match self {
Self::Hello { .. } => None, Self::Offer { target_peer_id, .. } => Some(target_peer_id),
Self::Answer { target_peer_id, .. } => Some(target_peer_id),
Self::Candidate { target_peer_id, .. } => Some(target_peer_id),
Self::Candidates { target_peer_id, .. } => Some(target_peer_id),
}
}
pub fn is_for(&self, my_peer_id: &str) -> bool {
match self.target_peer_id() {
Some(target) => target == my_peer_id,
None => true, }
}
}
#[inline]
pub fn is_polite_peer(local_peer_id: &str, remote_peer_id: &str) -> bool {
local_peer_id < remote_peer_id
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IceCandidate {
pub candidate: String,
#[serde(rename = "sdpMLineIndex")]
pub sdp_m_line_index: Option<u16>,
#[serde(rename = "sdpMid")]
pub sdp_mid: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum DataMessage {
#[serde(rename = "req")]
Request {
id: u32,
hash: String,
#[serde(skip_serializing_if = "Option::is_none")]
htl: Option<u8>,
},
#[serde(rename = "res")]
Response {
id: u32,
hash: String,
found: bool,
#[serde(skip_serializing_if = "Option::is_none")]
size: Option<u64>,
},
#[serde(rename = "push")]
Push { hash: String },
#[serde(rename = "have")]
Have { hashes: Vec<String> },
#[serde(rename = "want")]
Want { hashes: Vec<String> },
#[serde(rename = "root")]
RootUpdate {
hash: String,
#[serde(skip_serializing_if = "Option::is_none")]
size: Option<u64>,
},
}
pub const MAX_HTL: u8 = 10;
pub const DECREMENT_AT_MAX_PROB: f64 = 0.5;
pub const DECREMENT_AT_MIN_PROB: f64 = 0.25;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HtlMode {
Probabilistic,
}
#[derive(Debug, Clone, Copy)]
pub struct HtlPolicy {
pub mode: HtlMode,
pub max_htl: u8,
pub p_at_max: f64,
pub p_at_min: f64,
}
pub const BLOB_REQUEST_POLICY: HtlPolicy = HtlPolicy {
mode: HtlMode::Probabilistic,
max_htl: MAX_HTL,
p_at_max: DECREMENT_AT_MAX_PROB,
p_at_min: DECREMENT_AT_MIN_PROB,
};
pub const MESH_EVENT_POLICY: HtlPolicy = HtlPolicy {
mode: HtlMode::Probabilistic,
max_htl: 4,
p_at_max: 0.75,
p_at_min: 0.5,
};
#[derive(Debug, Clone, Copy)]
pub struct PeerHTLConfig {
pub at_max_sample: f64,
pub at_min_sample: f64,
}
impl PeerHTLConfig {
pub fn random() -> Self {
use rand::Rng;
let mut rng = rand::thread_rng();
Self::from_samples(rng.gen_range(0.0..1.0), rng.gen_range(0.0..1.0))
}
pub fn from_samples(at_max_sample: f64, at_min_sample: f64) -> Self {
Self {
at_max_sample: at_max_sample.clamp(0.0, 1.0),
at_min_sample: at_min_sample.clamp(0.0, 1.0),
}
}
pub fn from_flags(decrement_at_max: bool, decrement_at_min: bool) -> Self {
Self {
at_max_sample: if decrement_at_max { 0.0 } else { 1.0 },
at_min_sample: if decrement_at_min { 0.0 } else { 1.0 },
}
}
pub fn decrement(&self, htl: u8) -> u8 {
decrement_htl_with_policy(htl, &BLOB_REQUEST_POLICY, self)
}
pub fn decrement_with_policy(&self, htl: u8, policy: &HtlPolicy) -> u8 {
decrement_htl_with_policy(htl, policy, self)
}
}
impl Default for PeerHTLConfig {
fn default() -> Self {
Self::random()
}
}
pub fn decrement_htl_with_policy(htl: u8, policy: &HtlPolicy, config: &PeerHTLConfig) -> u8 {
let htl = htl.min(policy.max_htl);
if htl == 0 {
return 0;
}
match policy.mode {
HtlMode::Probabilistic => {
let p_at_max = policy.p_at_max.clamp(0.0, 1.0);
let p_at_min = policy.p_at_min.clamp(0.0, 1.0);
if htl == policy.max_htl {
return if config.at_max_sample < p_at_max {
htl.saturating_sub(1)
} else {
htl
};
}
if htl == 1 {
return if config.at_min_sample < p_at_min {
0
} else {
htl
};
}
htl.saturating_sub(1)
}
}
}
pub fn should_forward_htl(htl: u8) -> bool {
htl > 0
}
pub fn should_forward(htl: u8) -> bool {
should_forward_htl(htl)
}
use tokio::sync::{mpsc, oneshot};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum PeerPool {
Follows,
Other,
}
#[derive(Debug, Clone, Copy)]
pub struct PoolConfig {
pub max_connections: usize,
pub satisfied_connections: usize,
}
impl PoolConfig {
#[inline]
pub fn can_accept(&self, current_count: usize) -> bool {
current_count < self.max_connections
}
#[inline]
pub fn needs_peers(&self, current_count: usize) -> bool {
current_count < self.satisfied_connections
}
#[inline]
pub fn is_satisfied(&self, current_count: usize) -> bool {
current_count >= self.satisfied_connections
}
}
impl Default for PoolConfig {
fn default() -> Self {
Self {
max_connections: 16,
satisfied_connections: 8,
}
}
}
#[derive(Debug, Clone)]
pub struct PoolSettings {
pub follows: PoolConfig,
pub other: PoolConfig,
}
impl Default for PoolSettings {
fn default() -> Self {
Self {
follows: PoolConfig {
max_connections: 16,
satisfied_connections: 8,
},
other: PoolConfig {
max_connections: 16,
satisfied_connections: 8,
},
}
}
}
pub struct ClassifyRequest {
pub pubkey: String,
pub response: oneshot::Sender<PeerPool>,
}
pub type ClassifierTx = mpsc::Sender<ClassifyRequest>;
pub type ClassifierRx = mpsc::Receiver<ClassifyRequest>;
pub fn classifier_channel(buffer: usize) -> (ClassifierTx, ClassifierRx) {
mpsc::channel(buffer)
}
#[derive(Clone)]
pub struct MeshStoreConfig {
pub relays: Vec<String>,
pub roots: Vec<Hash>,
pub request_timeout_ms: u64,
pub hello_interval_ms: u64,
pub debug: bool,
pub pools: PoolSettings,
pub classifier_tx: Option<ClassifierTx>,
pub request_selection_strategy: SelectionStrategy,
pub request_fairness_enabled: bool,
pub request_dispatch: RequestDispatchConfig,
pub upstream_blossom_servers: Vec<String>,
}
impl Default for MeshStoreConfig {
fn default() -> Self {
Self {
relays: vec![
"wss://temp.iris.to".to_string(),
"wss://relay.damus.io".to_string(),
],
roots: Vec::new(),
request_timeout_ms: 10000,
hello_interval_ms: 30000,
debug: false,
pools: PoolSettings::default(),
classifier_tx: None,
request_selection_strategy: SelectionStrategy::Weighted,
request_fairness_enabled: true,
request_dispatch: RequestDispatchConfig {
initial_fanout: 2,
hedge_fanout: 1,
max_fanout: 8,
hedge_interval_ms: 120,
},
upstream_blossom_servers: Vec::new(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PeerState {
New,
Connecting,
Connected,
Ready,
Disconnected,
}
#[derive(Debug, Clone, Default)]
pub struct MeshStats {
pub connected_peers: usize,
pub pending_requests: usize,
pub bytes_sent: u64,
pub bytes_received: u64,
pub requests_made: u64,
pub requests_fulfilled: u64,
}
pub type WebRTCStats = MeshStats;
pub const NOSTR_KIND_HASHTREE: u16 = 25050;
pub const MESH_SIGNALING_EVENT_KIND: u16 = NOSTR_KIND_HASHTREE;
pub const MESH_PROTOCOL: &str = "htree.nostr.mesh.v1";
pub const MESH_PROTOCOL_VERSION: u8 = 1;
pub const MESH_DEFAULT_HTL: u8 = MESH_EVENT_POLICY.max_htl;
pub const MESH_MAX_HTL: u8 = 6;
#[derive(Debug)]
pub struct TimedSeenSet {
entries: HashMap<String, Instant>,
order: VecDeque<(String, Instant)>,
ttl: Duration,
capacity: usize,
}
impl TimedSeenSet {
pub fn new(capacity: usize, ttl: Duration) -> Self {
Self {
entries: HashMap::new(),
order: VecDeque::new(),
ttl,
capacity,
}
}
fn prune(&mut self, now: Instant) {
while let Some((key, inserted_at)) = self.order.front().cloned() {
if now.duration_since(inserted_at) < self.ttl {
break;
}
self.order.pop_front();
if self
.entries
.get(&key)
.map(|ts| *ts == inserted_at)
.unwrap_or(false)
{
self.entries.remove(&key);
}
}
while self.entries.len() > self.capacity {
if let Some((key, inserted_at)) = self.order.pop_front() {
if self
.entries
.get(&key)
.map(|ts| *ts == inserted_at)
.unwrap_or(false)
{
self.entries.remove(&key);
}
} else {
break;
}
}
}
pub fn insert_if_new(&mut self, key: String) -> bool {
let now = Instant::now();
self.prune(now);
if self.entries.contains_key(&key) {
return false;
}
self.entries.insert(key.clone(), now);
self.order.push_back((key, now));
self.prune(now);
true
}
pub fn contains(&mut self, key: &str) -> bool {
let now = Instant::now();
self.prune(now);
self.entries.contains_key(key)
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum MeshNostrPayload {
#[serde(rename = "EVENT")]
Event { event: Event },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MeshNostrFrame {
pub protocol: String,
pub version: u8,
pub frame_id: String,
pub htl: u8,
pub sender_peer_id: String,
pub payload: MeshNostrPayload,
}
impl MeshNostrFrame {
pub fn new_event(event: Event, sender_peer_id: &str, htl: u8) -> Self {
Self::new_event_with_id(
event,
sender_peer_id,
&uuid::Uuid::new_v4().to_string(),
htl,
)
}
pub fn new_event_with_id(event: Event, sender_peer_id: &str, frame_id: &str, htl: u8) -> Self {
Self {
protocol: MESH_PROTOCOL.to_string(),
version: MESH_PROTOCOL_VERSION,
frame_id: frame_id.to_string(),
htl,
sender_peer_id: sender_peer_id.to_string(),
payload: MeshNostrPayload::Event { event },
}
}
pub fn event(&self) -> &Event {
match &self.payload {
MeshNostrPayload::Event { event } => event,
}
}
}
pub fn validate_mesh_frame(frame: &MeshNostrFrame) -> Result<(), &'static str> {
if frame.protocol != MESH_PROTOCOL {
return Err("invalid protocol");
}
if frame.version != MESH_PROTOCOL_VERSION {
return Err("invalid version");
}
if frame.frame_id.is_empty() {
return Err("missing frame id");
}
if frame.sender_peer_id.is_empty() {
return Err("missing sender peer id");
}
if frame.sender_peer_id.contains(':') {
return Err("invalid sender peer id");
}
if frame.htl == 0 || frame.htl > MESH_MAX_HTL {
return Err("invalid htl");
}
let event = frame.event();
if event.kind != Kind::Custom(NOSTR_KIND_HASHTREE)
&& event.kind != Kind::Ephemeral(NOSTR_KIND_HASHTREE)
{
return Err("unsupported event kind");
}
Ok(())
}
pub const DATA_CHANNEL_LABEL: &str = "hashtree";