use crate::{PeerId, Multiaddr, Result, P2PError};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::time::{Duration, Instant, SystemTime};
use sha2::{Digest, Sha256};
use tokio::sync::RwLock;
use tracing::{debug, info};
use futures;
pub mod skademlia;
pub mod ipv6_identity;
#[derive(Debug, Clone)]
pub struct DHTConfig {
pub replication_factor: usize,
pub bucket_size: usize,
pub alpha: usize,
pub record_ttl: Duration,
pub bucket_refresh_interval: Duration,
pub republish_interval: Duration,
pub max_distance: u8,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Key {
hash: [u8; 32],
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Record {
pub key: Key,
pub value: Vec<u8>,
pub publisher: PeerId,
pub created_at: SystemTime,
pub expires_at: SystemTime,
pub signature: Option<Vec<u8>>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DHTNode {
pub peer_id: PeerId,
pub addresses: Vec<Multiaddr>,
#[serde(with = "instant_as_secs")]
pub last_seen: Instant,
pub distance: Key,
pub is_connected: bool,
}
mod instant_as_secs {
use serde::{Deserializer, Serializer, Deserialize, Serialize};
use std::time::Instant;
pub fn serialize<S>(instant: &Instant, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
instant.elapsed().as_secs().serialize(serializer)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Instant, D::Error>
where
D: Deserializer<'de>,
{
let secs = u64::deserialize(deserializer)?;
Ok(Instant::now() - std::time::Duration::from_secs(secs))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializableDHTNode {
pub peer_id: PeerId,
pub addresses: Vec<Multiaddr>,
pub last_seen_secs: u64,
pub distance: Key,
pub is_connected: bool,
}
#[derive(Debug)]
struct KBucket {
nodes: VecDeque<DHTNode>,
capacity: usize,
last_refresh: Instant,
}
#[derive(Debug)]
pub struct RoutingTable {
local_id: Key,
buckets: Vec<RwLock<KBucket>>,
#[allow(dead_code)]
config: DHTConfig,
}
#[derive(Debug)]
pub struct DHTStorage {
records: RwLock<HashMap<Key, Record>>,
#[allow(dead_code)]
config: DHTConfig,
}
#[derive(Debug)]
pub struct DHT {
local_id: Key,
routing_table: RoutingTable,
storage: DHTStorage,
#[allow(dead_code)]
config: DHTConfig,
pub skademlia: Option<skademlia::SKademlia>,
pub ipv6_identity_manager: Option<ipv6_identity::IPv6DHTIdentityManager>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DHTQuery {
FindNode {
key: Key,
requester: PeerId
},
FindValue {
key: Key,
requester: PeerId
},
Store {
record: Record,
requester: PeerId
},
Ping {
requester: PeerId
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DHTResponse {
Nodes {
nodes: Vec<SerializableDHTNode>
},
Value {
record: Record
},
Stored {
success: bool
},
Pong {
responder: PeerId
},
Error {
message: String
},
}
#[derive(Debug)]
pub struct LookupState {
pub target: Key,
pub queried: HashMap<PeerId, Instant>,
pub to_query: VecDeque<DHTNode>,
pub closest: Vec<DHTNode>,
pub started_at: Instant,
pub alpha: usize,
}
impl Default for DHTConfig {
fn default() -> Self {
Self {
replication_factor: 20, bucket_size: 20, alpha: 3, record_ttl: Duration::from_secs(24 * 60 * 60), bucket_refresh_interval: Duration::from_secs(60 * 60), republish_interval: Duration::from_secs(24 * 60 * 60), max_distance: 160, }
}
}
impl Key {
pub fn new(data: &[u8]) -> Self {
let mut hasher = Sha256::new();
hasher.update(data);
let hash: [u8; 32] = hasher.finalize().into();
Self { hash }
}
pub fn from_hash(hash: [u8; 32]) -> Self {
Self { hash }
}
pub fn random() -> Self {
use rand::RngCore;
let mut hash = [0u8; 32];
rand::thread_rng().fill_bytes(&mut hash);
Self { hash }
}
pub fn as_bytes(&self) -> &[u8] {
&self.hash
}
pub fn to_hex(&self) -> String {
hex::encode(self.hash)
}
pub fn distance(&self, other: &Key) -> Key {
let mut result = [0u8; 32];
for i in 0..32 {
result[i] = self.hash[i] ^ other.hash[i];
}
Key { hash: result }
}
pub fn leading_zeros(&self) -> u32 {
for (i, &byte) in self.hash.iter().enumerate() {
if byte != 0 {
return (i * 8) as u32 + byte.leading_zeros();
}
}
256 }
pub fn bucket_index(&self, local_id: &Key) -> usize {
let distance = self.distance(local_id);
let leading_zeros = distance.leading_zeros();
if leading_zeros >= 255 {
255 } else {
(255 - leading_zeros) as usize
}
}
}
impl Record {
pub fn new(key: Key, value: Vec<u8>, publisher: PeerId) -> Self {
let now = SystemTime::now();
let ttl = Duration::from_secs(24 * 60 * 60);
Self {
key,
value,
publisher,
created_at: now,
expires_at: now + ttl,
signature: None,
}
}
pub fn with_ttl(key: Key, value: Vec<u8>, publisher: PeerId, ttl: Duration) -> Self {
let now = SystemTime::now();
Self {
key,
value,
publisher,
created_at: now,
expires_at: now + ttl,
signature: None,
}
}
pub fn is_expired(&self) -> bool {
SystemTime::now() > self.expires_at
}
pub fn age(&self) -> Duration {
SystemTime::now()
.duration_since(self.created_at)
.unwrap_or(Duration::ZERO)
}
pub fn sign(&mut self, _private_key: &[u8]) -> Result<()> {
self.signature = Some(vec![0u8; 64]); Ok(())
}
pub fn verify(&self, _public_key: &[u8]) -> bool {
self.signature.is_some()
}
}
impl DHTNode {
pub fn new(peer_id: PeerId, addresses: Vec<Multiaddr>, local_id: &Key) -> Self {
let node_key = Key::new(peer_id.as_bytes());
let distance = node_key.distance(local_id);
Self {
peer_id,
addresses,
last_seen: Instant::now(),
distance,
is_connected: false,
}
}
pub fn new_with_key(peer_id: PeerId, addresses: Vec<Multiaddr>, key: Key) -> Self {
Self {
peer_id,
addresses,
last_seen: Instant::now(),
distance: key,
is_connected: false,
}
}
pub fn touch(&mut self) {
self.last_seen = Instant::now();
}
pub fn is_stale(&self, timeout: Duration) -> bool {
self.last_seen.elapsed() > timeout
}
pub fn key(&self) -> Key {
Key::new(self.peer_id.as_bytes())
}
pub fn to_serializable(&self) -> SerializableDHTNode {
SerializableDHTNode {
peer_id: self.peer_id.clone(),
addresses: self.addresses.clone(),
last_seen_secs: self.last_seen.elapsed().as_secs(),
distance: self.distance.clone(),
is_connected: self.is_connected,
}
}
}
impl SerializableDHTNode {
pub fn to_dht_node(&self) -> DHTNode {
DHTNode {
peer_id: self.peer_id.clone(),
addresses: self.addresses.clone(),
last_seen: Instant::now() - Duration::from_secs(self.last_seen_secs),
distance: self.distance.clone(),
is_connected: self.is_connected,
}
}
}
impl KBucket {
fn new(capacity: usize) -> Self {
Self {
nodes: VecDeque::new(),
capacity,
last_refresh: Instant::now(),
}
}
fn add_node(&mut self, node: DHTNode) -> bool {
if let Some(pos) = self.nodes.iter().position(|n| n.peer_id == node.peer_id) {
let mut existing = self.nodes.remove(pos).unwrap();
existing.touch();
existing.is_connected = node.is_connected;
self.nodes.push_front(existing);
return true;
}
if self.nodes.len() < self.capacity {
self.nodes.push_front(node);
true
} else {
false
}
}
fn remove_node(&mut self, peer_id: &PeerId) -> bool {
if let Some(pos) = self.nodes.iter().position(|n| n.peer_id == *peer_id) {
self.nodes.remove(pos);
true
} else {
false
}
}
fn closest_nodes(&self, target: &Key, count: usize) -> Vec<DHTNode> {
let mut nodes: Vec<_> = self.nodes.iter().cloned().collect();
nodes.sort_by_key(|node| node.key().distance(target).as_bytes().to_vec());
nodes.into_iter().take(count).collect()
}
fn needs_refresh(&self, interval: Duration) -> bool {
self.last_refresh.elapsed() > interval
}
}
impl RoutingTable {
pub fn new(local_id: Key, config: DHTConfig) -> Self {
let mut buckets = Vec::new();
for _ in 0..256 {
buckets.push(RwLock::new(KBucket::new(config.bucket_size)));
}
Self {
local_id,
buckets,
config,
}
}
pub async fn add_node(&self, node: DHTNode) -> Result<()> {
let bucket_index = node.key().bucket_index(&self.local_id);
let mut bucket = self.buckets[bucket_index].write().await;
if bucket.add_node(node.clone()) {
debug!("Added node {} to bucket {}", node.peer_id, bucket_index);
} else {
debug!("Bucket {} full, could not add node {}", bucket_index, node.peer_id);
}
Ok(())
}
pub async fn remove_node(&self, peer_id: &PeerId) -> Result<()> {
let node_key = Key::new(peer_id.as_bytes());
let bucket_index = node_key.bucket_index(&self.local_id);
let mut bucket = self.buckets[bucket_index].write().await;
if bucket.remove_node(peer_id) {
debug!("Removed node {} from bucket {}", peer_id, bucket_index);
}
Ok(())
}
pub async fn closest_nodes(&self, target: &Key, count: usize) -> Vec<DHTNode> {
let mut all_nodes = Vec::new();
let target_bucket = target.bucket_index(&self.local_id);
let mut checked = vec![false; 256];
let mut to_check = VecDeque::new();
to_check.push_back(target_bucket);
while let Some(bucket_idx) = to_check.pop_front() {
if checked[bucket_idx] {
continue;
}
checked[bucket_idx] = true;
let bucket = self.buckets[bucket_idx].read().await;
all_nodes.extend(bucket.closest_nodes(target, bucket.nodes.len()));
if bucket_idx > 0 && !checked[bucket_idx - 1] {
to_check.push_back(bucket_idx - 1);
}
if bucket_idx < 255 && !checked[bucket_idx + 1] {
to_check.push_back(bucket_idx + 1);
}
if all_nodes.len() >= count * 2 {
break;
}
}
all_nodes.sort_by_key(|node| node.key().distance(target).as_bytes().to_vec());
all_nodes.into_iter().take(count).collect()
}
pub async fn stats(&self) -> (usize, usize) {
let mut total_nodes = 0;
let mut active_buckets = 0;
for bucket in &self.buckets {
let bucket_guard = bucket.read().await;
let node_count = bucket_guard.nodes.len();
total_nodes += node_count;
if node_count > 0 {
active_buckets += 1;
}
}
(total_nodes, active_buckets)
}
}
impl DHTStorage {
pub fn new(config: DHTConfig) -> Self {
Self {
records: RwLock::new(HashMap::new()),
config,
}
}
pub async fn store(&self, record: Record) -> Result<()> {
let mut records = self.records.write().await;
records.insert(record.key.clone(), record);
Ok(())
}
pub async fn get(&self, key: &Key) -> Option<Record> {
let records = self.records.read().await;
records.get(key).cloned()
}
pub async fn cleanup_expired(&self) -> usize {
let mut records = self.records.write().await;
let initial_count = records.len();
records.retain(|_, record| !record.is_expired());
initial_count - records.len()
}
pub async fn all_records(&self) -> Vec<Record> {
let records = self.records.read().await;
records.values().cloned().collect()
}
pub async fn stats(&self) -> (usize, usize) {
let records = self.records.read().await;
let total = records.len();
let expired = records.values().filter(|r| r.is_expired()).count();
(total, expired)
}
}
impl DHT {
pub fn new(local_id: Key, config: DHTConfig) -> Self {
let routing_table = RoutingTable::new(local_id.clone(), config.clone());
let storage = DHTStorage::new(config.clone());
Self {
local_id,
routing_table,
storage,
config,
skademlia: None,
ipv6_identity_manager: None,
}
}
pub fn new_with_security(local_id: Key, config: DHTConfig, skademlia_config: skademlia::SKademliaConfig) -> Self {
let routing_table = RoutingTable::new(local_id.clone(), config.clone());
let storage = DHTStorage::new(config.clone());
let skademlia = skademlia::SKademlia::new(skademlia_config);
Self {
local_id,
routing_table,
storage,
config,
skademlia: Some(skademlia),
ipv6_identity_manager: None,
}
}
pub fn new_with_ipv6_security(
local_id: Key,
config: DHTConfig,
skademlia_config: skademlia::SKademliaConfig,
ipv6_config: ipv6_identity::IPv6DHTConfig
) -> Self {
let routing_table = RoutingTable::new(local_id.clone(), config.clone());
let storage = DHTStorage::new(config.clone());
let skademlia = skademlia::SKademlia::new(skademlia_config);
let ipv6_identity_manager = ipv6_identity::IPv6DHTIdentityManager::new(ipv6_config);
Self {
local_id,
routing_table,
storage,
config,
skademlia: Some(skademlia),
ipv6_identity_manager: Some(ipv6_identity_manager),
}
}
pub fn set_local_ipv6_identity(&mut self, identity: crate::security::IPv6NodeID) -> Result<()> {
if let Some(ref mut manager) = self.ipv6_identity_manager {
self.local_id = ipv6_identity::IPv6DHTIdentityManager::generate_dht_key(&identity);
manager.set_local_identity(identity)?;
info!("Local IPv6 identity set and DHT key updated");
Ok(())
} else {
Err(P2PError::Security("IPv6 identity manager not enabled".to_string()).into())
}
}
pub async fn add_bootstrap_node(&self, peer_id: PeerId, addresses: Vec<Multiaddr>) -> Result<()> {
let node = DHTNode::new(peer_id, addresses, &self.local_id);
self.routing_table.add_node(node).await
}
pub async fn add_ipv6_node(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>, ipv6_identity: crate::security::IPv6NodeID) -> Result<()> {
if let Some(ref mut manager) = self.ipv6_identity_manager {
let base_node = DHTNode::new(peer_id.clone(), addresses, &self.local_id);
let security_event = manager.validate_node_join(&base_node, &ipv6_identity).await?;
match security_event {
ipv6_identity::IPv6SecurityEvent::NodeJoined { verification_confidence, .. } => {
let ipv6_node = manager.enhance_dht_node(base_node.clone(), ipv6_identity).await?;
let mut enhanced_base_node = base_node;
enhanced_base_node.distance = ipv6_node.get_dht_key().distance(&self.local_id);
self.routing_table.add_node(enhanced_base_node).await?;
info!("Added IPv6-verified node {} with confidence {:.2}", peer_id, verification_confidence);
Ok(())
}
ipv6_identity::IPv6SecurityEvent::VerificationFailed { reason, .. } => {
Err(P2PError::Security(format!("IPv6 verification failed: {}", reason)).into())
}
ipv6_identity::IPv6SecurityEvent::DiversityViolation { subnet_type, .. } => {
Err(P2PError::Security(format!("IP diversity violation: {}", subnet_type)).into())
}
ipv6_identity::IPv6SecurityEvent::NodeBanned { reason, .. } => {
Err(P2PError::Security(format!("Node banned: {}", reason)).into())
}
_ => {
Err(P2PError::Security("Unexpected security event".to_string()).into())
}
}
} else {
self.add_bootstrap_node(peer_id, addresses).await
}
}
pub async fn remove_ipv6_node(&mut self, peer_id: &PeerId) -> Result<()> {
self.routing_table.remove_node(peer_id).await?;
if let Some(ref mut manager) = self.ipv6_identity_manager {
manager.remove_node(peer_id);
}
Ok(())
}
pub fn is_node_banned(&self, peer_id: &PeerId) -> bool {
if let Some(ref manager) = self.ipv6_identity_manager {
manager.is_node_banned(peer_id)
} else {
false
}
}
pub async fn put(&self, key: Key, value: Vec<u8>) -> Result<()> {
let record = Record::new(key.clone(), value, self.local_id.to_hex());
self.storage.store(record.clone()).await?;
let closest_nodes = self.routing_table
.closest_nodes(&key, self.config.replication_factor)
.await;
info!("Storing record with key {} on {} nodes", key.to_hex(), closest_nodes.len());
if closest_nodes.is_empty() {
info!("No other nodes available for replication, storing only locally");
return Ok(());
}
let mut successful_replications = 0;
for node in &closest_nodes {
if self.replicate_record(&record, node).await.is_ok() {
successful_replications += 1;
}
}
info!("Successfully replicated record {} to {}/{} nodes",
key.to_hex(), successful_replications, closest_nodes.len());
let required_replications = if closest_nodes.len() == 1 {
1
} else {
std::cmp::max(1, closest_nodes.len() / 2)
};
if successful_replications >= required_replications {
Ok(())
} else {
Err(P2PError::DHT(format!(
"Insufficient replication: only {}/{} nodes stored the record (required: {})",
successful_replications, closest_nodes.len(), required_replications
)).into())
}
}
pub async fn get(&self, key: &Key) -> Option<Record> {
if let Some(record) = self.storage.get(key).await {
if !record.is_expired() {
return Some(record);
}
}
if let Some(record) = self.iterative_find_value(key).await {
if self.storage.store(record.clone()).await.is_ok() {
debug!("Cached retrieved record with key {}", key.to_hex());
}
return Some(record);
}
None
}
pub async fn find_node(&self, key: &Key) -> Vec<DHTNode> {
self.routing_table.closest_nodes(key, self.config.replication_factor).await
}
pub async fn handle_query(&self, query: DHTQuery) -> DHTResponse {
match query {
DHTQuery::FindNode { key, requester: _ } => {
let nodes = self.find_node(&key).await;
let serializable_nodes = nodes.into_iter().map(|n| n.to_serializable()).collect();
DHTResponse::Nodes { nodes: serializable_nodes }
}
DHTQuery::FindValue { key, requester: _ } => {
if let Some(record) = self.storage.get(&key).await {
if !record.is_expired() {
return DHTResponse::Value { record };
}
}
let nodes = self.find_node(&key).await;
let serializable_nodes = nodes.into_iter().map(|n| n.to_serializable()).collect();
DHTResponse::Nodes { nodes: serializable_nodes }
}
DHTQuery::Store { record, requester: _ } => {
match self.storage.store(record).await {
Ok(()) => DHTResponse::Stored { success: true },
Err(_) => DHTResponse::Stored { success: false },
}
}
DHTQuery::Ping { requester: _ } => {
DHTResponse::Pong { responder: self.local_id.to_hex() }
}
}
}
pub async fn stats(&self) -> DHTStats {
let (total_nodes, active_buckets) = self.routing_table.stats().await;
let (stored_records, expired_records) = self.storage.stats().await;
DHTStats {
local_id: self.local_id.clone(),
total_nodes,
active_buckets,
stored_records,
expired_records,
}
}
pub async fn maintenance(&self) -> Result<()> {
let expired_count = self.storage.cleanup_expired().await;
if expired_count > 0 {
debug!("Cleaned up {} expired records", expired_count);
}
self.republish_records().await?;
self.refresh_buckets().await?;
Ok(())
}
pub async fn secure_get(&mut self, key: &Key) -> Result<Option<Record>> {
if let Some(record) = self.storage.get(key).await {
if !record.is_expired() {
return Ok(Some(record));
}
}
let (enable_distance_verification, disjoint_path_count, min_reputation) = if let Some(ref skademlia) = self.skademlia {
(skademlia.config.enable_distance_verification,
skademlia.config.disjoint_path_count,
skademlia.config.min_routing_reputation)
} else {
return Ok(self.get(key).await);
};
let initial_nodes = self.routing_table
.closest_nodes(key, disjoint_path_count * 3)
.await;
if initial_nodes.is_empty() {
return Ok(None);
}
let secure_nodes = if let Some(ref mut skademlia) = self.skademlia {
skademlia.secure_lookup(key.clone(), initial_nodes).await?
} else {
return Ok(None);
};
for node in &secure_nodes {
if enable_distance_verification {
let witness_nodes = self.select_witness_nodes(&node.peer_id, 3).await;
let consensus = if let Some(ref mut skademlia) = self.skademlia {
skademlia.verify_distance_consensus(&node.peer_id, key, witness_nodes).await?
} else {
continue;
};
if consensus.confidence < min_reputation {
debug!("Skipping node {} due to low distance verification confidence", node.peer_id);
continue;
}
}
let query = DHTQuery::FindValue {
key: key.clone(),
requester: self.local_id.to_hex()
};
if let Ok(DHTResponse::Value { record }) = self.simulate_query(node, query).await {
let _ = self.storage.store(record.clone()).await;
return Ok(Some(record));
}
}
Ok(None)
}
pub async fn secure_put(&mut self, key: Key, value: Vec<u8>) -> Result<()> {
let record = Record::new(key.clone(), value, self.local_id.to_hex());
self.storage.store(record.clone()).await?;
let secure_nodes = if let Some(ref skademlia) = self.skademlia {
let candidate_nodes = self.routing_table
.closest_nodes(&key, self.config.replication_factor * 2)
.await;
skademlia.select_secure_nodes(
&candidate_nodes,
&key,
self.config.replication_factor
)
} else {
self.routing_table.closest_nodes(&key, self.config.replication_factor).await
};
info!("Storing record with key {} on {} secure nodes", key.to_hex(), secure_nodes.len());
let mut replication_results = Vec::new();
let mut successful_replications = 0;
for node in &secure_nodes {
let success = self.replicate_record(&record, node).await.is_ok();
replication_results.push((node.peer_id.clone(), success));
if success {
successful_replications += 1;
}
}
if let Some(ref mut skademlia) = self.skademlia {
for (peer_id, success) in replication_results {
skademlia.reputation_manager.update_reputation(
&peer_id,
success,
Duration::from_millis(100)
);
}
}
if successful_replications > 0 {
info!("Successfully replicated to {}/{} secure nodes",
successful_replications, secure_nodes.len());
}
Ok(())
}
pub async fn update_sibling_list(&mut self, key: Key) -> Result<()> {
if let Some(ref mut skademlia) = self.skademlia {
let nodes = self.routing_table.closest_nodes(&key, skademlia.config.sibling_list_size).await;
skademlia.update_sibling_list(key, nodes);
}
Ok(())
}
pub async fn validate_routing_consistency(&self) -> Result<skademlia::ConsistencyReport> {
if let Some(ref skademlia) = self.skademlia {
let sample_key = Key::random();
let sample_nodes = self.routing_table.closest_nodes(&sample_key, 100).await;
skademlia.validate_routing_consistency(&sample_nodes).await
} else {
Err(P2PError::DHT("S/Kademlia not enabled".to_string()).into())
}
}
pub fn create_distance_challenge(&mut self, peer_id: &PeerId, key: &Key) -> Option<skademlia::DistanceChallenge> {
self.skademlia.as_mut()
.map(|skademlia| skademlia.create_distance_challenge(peer_id, key))
}
pub fn verify_distance_proof(&self, proof: &skademlia::DistanceProof) -> Result<bool> {
if let Some(ref skademlia) = self.skademlia {
skademlia.verify_distance_proof(proof)
} else {
Err(P2PError::DHT("S/Kademlia not enabled".to_string()).into())
}
}
#[allow(dead_code)]
async fn verify_node_distances(&self, nodes: &[DHTNode], _target_key: &Key, min_reputation: f64) -> Result<Vec<DHTNode>> {
let mut verified_nodes = Vec::new();
for node in nodes {
let witness_nodes = self.select_witness_nodes(&node.peer_id, 3).await;
if witness_nodes.len() >= 2 {
let consensus_confidence = 0.8;
if consensus_confidence >= min_reputation {
verified_nodes.push(node.clone());
} else {
debug!("Node {} failed distance verification with confidence {}",
node.peer_id, consensus_confidence);
}
}
}
Ok(verified_nodes)
}
async fn select_witness_nodes(&self, target_peer: &PeerId, count: usize) -> Vec<PeerId> {
let target_key = Key::new(target_peer.as_bytes());
let candidate_nodes = self.routing_table.closest_nodes(&target_key, count * 2).await;
candidate_nodes.into_iter()
.filter(|node| node.peer_id != *target_peer)
.take(count)
.map(|node| node.peer_id)
.collect()
}
pub fn create_enhanced_distance_challenge(&mut self, peer_id: &PeerId, key: &Key, suspected_attack: bool) -> Option<skademlia::EnhancedDistanceChallenge> {
if let Some(ref mut skademlia) = self.skademlia {
Some(skademlia.create_adaptive_distance_challenge(peer_id, key, suspected_attack))
} else {
None
}
}
pub async fn verify_distance_multi_round(&mut self, challenge: &skademlia::EnhancedDistanceChallenge) -> Result<bool> {
if let Some(ref mut skademlia) = self.skademlia {
skademlia.verify_distance_multi_round(challenge).await
} else {
Err(P2PError::DHT("S/Kademlia not enabled".to_string()).into())
}
}
pub fn get_security_bucket(&mut self, key: &Key) -> Option<&mut skademlia::SecurityBucket> {
self.skademlia.as_mut()
.map(|skademlia| skademlia.get_security_bucket(key))
}
pub async fn add_trusted_node(&mut self, key: &Key, peer_id: PeerId, addresses: Vec<Multiaddr>) -> Result<()> {
if let Some(ref mut skademlia) = self.skademlia {
let node = DHTNode::new(peer_id, addresses, &self.local_id);
let security_bucket = skademlia.get_security_bucket(key);
security_bucket.add_trusted_node(node);
}
Ok(())
}
pub async fn ipv6_secure_get(&mut self, key: &Key) -> Result<Option<Record>> {
if self.is_node_banned(&self.local_id.to_hex()) {
return Err(P2PError::Security("Local node is banned".to_string()).into());
}
if let Some(record) = self.storage.get(key).await {
if !record.is_expired() {
return Ok(Some(record));
}
}
let verified_nodes = self.get_ipv6_verified_nodes_for_key(key).await?;
if verified_nodes.is_empty() {
return self.secure_get(key).await;
}
if let Some(ref mut skademlia) = self.skademlia {
let secure_nodes = skademlia.secure_lookup(key.clone(), verified_nodes).await?;
for node in &secure_nodes {
if let Some(ref manager) = self.ipv6_identity_manager {
if let Some(ipv6_node) = manager.get_verified_node(&node.peer_id) {
if ipv6_node.needs_identity_refresh(manager.config.identity_refresh_interval) {
debug!("Skipping node {} due to stale IPv6 identity", node.peer_id);
continue;
}
} else {
debug!("Skipping node {} without verified IPv6 identity", node.peer_id);
continue;
}
}
let query = DHTQuery::FindValue {
key: key.clone(),
requester: self.local_id.to_hex()
};
if let Ok(DHTResponse::Value { record }) = self.simulate_query(node, query).await {
if let Some(ref mut manager) = self.ipv6_identity_manager {
manager.update_ipv6_reputation(&node.peer_id, true);
}
let _ = self.storage.store(record.clone()).await;
return Ok(Some(record));
}
}
}
Ok(None)
}
pub async fn ipv6_secure_put(&mut self, key: Key, value: Vec<u8>) -> Result<()> {
if self.is_node_banned(&self.local_id.to_hex()) {
return Err(P2PError::Security("Local node is banned".to_string()).into());
}
let record = Record::new(key.clone(), value, self.local_id.to_hex());
self.storage.store(record.clone()).await?;
let verified_nodes = self.get_ipv6_verified_nodes_for_key(&key).await?;
let secure_nodes = if let Some(ref skademlia) = self.skademlia {
skademlia.select_secure_nodes(&verified_nodes, &key, self.config.replication_factor)
} else {
verified_nodes.into_iter().take(self.config.replication_factor).collect()
};
info!("Storing record with key {} on {} IPv6-verified secure nodes", key.to_hex(), secure_nodes.len());
let mut successful_replications = 0;
for node in &secure_nodes {
let success = self.replicate_record(&record, node).await.is_ok();
if let Some(ref mut manager) = self.ipv6_identity_manager {
manager.update_ipv6_reputation(&node.peer_id, success);
}
if success {
successful_replications += 1;
}
}
if successful_replications == 0 && !secure_nodes.is_empty() {
return Err(P2PError::DHT("Failed to replicate to any IPv6-verified nodes".to_string()).into());
}
info!("Successfully replicated to {}/{} IPv6-verified nodes",
successful_replications, secure_nodes.len());
Ok(())
}
async fn get_ipv6_verified_nodes_for_key(&self, key: &Key) -> Result<Vec<DHTNode>> {
let mut verified_nodes = Vec::new();
let candidate_nodes = self.routing_table.closest_nodes(key, self.config.replication_factor * 2).await;
if let Some(ref manager) = self.ipv6_identity_manager {
for node in candidate_nodes {
if let Some(ipv6_node) = manager.get_verified_node(&node.peer_id) {
if !ipv6_node.needs_identity_refresh(manager.config.identity_refresh_interval) {
if !manager.is_node_banned(&node.peer_id) {
verified_nodes.push(node);
}
}
}
}
} else {
verified_nodes = candidate_nodes;
}
Ok(verified_nodes)
}
pub fn get_ipv6_diversity_stats(&self) -> Option<crate::security::DiversityStats> {
self.ipv6_identity_manager.as_ref()
.map(|manager| manager.get_ipv6_diversity_stats())
}
pub fn cleanup_ipv6_data(&mut self) {
if let Some(ref mut manager) = self.ipv6_identity_manager {
manager.cleanup_expired();
}
}
pub fn ban_ipv6_node(&mut self, peer_id: &PeerId, reason: &str) {
if let Some(ref mut manager) = self.ipv6_identity_manager {
manager.ban_node(peer_id, reason);
}
}
pub fn get_local_ipv6_identity(&self) -> Option<&crate::security::IPv6NodeID> {
self.ipv6_identity_manager.as_ref()
.and_then(|manager| manager.get_local_identity())
}
async fn replicate_record(&self, record: &Record, node: &DHTNode) -> Result<()> {
debug!("Replicating record {} to node {}", record.key.to_hex(), node.peer_id);
tokio::time::sleep(Duration::from_millis(10)).await;
if rand::random::<f64>() < 0.95 {
Ok(())
} else {
Err(P2PError::Network("Replication failed".to_string()).into())
}
}
async fn iterative_find_value(&self, key: &Key) -> Option<Record> {
debug!("Starting iterative lookup for key {}", key.to_hex());
let mut lookup_state = LookupState::new(key.clone(), self.config.alpha);
let initial_nodes = self.routing_table.closest_nodes(key, self.config.alpha).await;
lookup_state.add_nodes(initial_nodes);
let mut iterations = 0;
const MAX_ITERATIONS: usize = 10;
while !lookup_state.is_complete() && iterations < MAX_ITERATIONS {
let nodes_to_query = lookup_state.next_nodes();
if nodes_to_query.is_empty() {
break;
}
let mut queries = Vec::new();
for node in &nodes_to_query {
let query = DHTQuery::FindValue {
key: key.clone(),
requester: self.local_id.to_hex()
};
queries.push(self.simulate_query(node, query));
}
for query_result in futures::future::join_all(queries).await {
match query_result {
Ok(DHTResponse::Value { record }) => {
debug!("Found value for key {} in iteration {}", key.to_hex(), iterations);
return Some(record);
}
Ok(DHTResponse::Nodes { nodes }) => {
let dht_nodes: Vec<DHTNode> = nodes.into_iter()
.map(|n| n.to_dht_node())
.collect();
lookup_state.add_nodes(dht_nodes);
}
_ => {
debug!("Query failed during iterative lookup");
}
}
}
iterations += 1;
}
debug!("Iterative lookup for key {} completed after {} iterations, value not found",
key.to_hex(), iterations);
None
}
async fn simulate_query(&self, _node: &DHTNode, query: DHTQuery) -> Result<DHTResponse> {
tokio::time::sleep(Duration::from_millis(50)).await;
Ok(self.handle_query(query).await)
}
async fn republish_records(&self) -> Result<()> {
let all_records = self.storage.all_records().await;
let mut republished_count = 0;
for record in all_records {
let remaining_ttl = record.expires_at
.duration_since(SystemTime::now())
.unwrap_or(Duration::ZERO);
if remaining_ttl < self.config.record_ttl / 4 {
let closest_nodes = self.routing_table
.closest_nodes(&record.key, self.config.replication_factor)
.await;
for node in &closest_nodes {
if self.replicate_record(&record, node).await.is_ok() {
republished_count += 1;
}
}
}
}
if republished_count > 0 {
debug!("Republished {} records during maintenance", republished_count);
}
Ok(())
}
async fn refresh_buckets(&self) -> Result<()> {
let mut refreshed_count = 0;
for bucket_index in 0..256 {
let needs_refresh = {
let bucket = self.routing_table.buckets[bucket_index].read().await;
bucket.needs_refresh(self.config.bucket_refresh_interval)
};
if needs_refresh {
let target_key = self.generate_key_for_bucket(bucket_index);
let _nodes = self.iterative_find_node(&target_key).await;
refreshed_count += 1;
{
let mut bucket = self.routing_table.buckets[bucket_index].write().await;
bucket.last_refresh = Instant::now();
}
}
}
if refreshed_count > 0 {
debug!("Refreshed {} buckets during maintenance", refreshed_count);
}
Ok(())
}
fn generate_key_for_bucket(&self, bucket_index: usize) -> Key {
let mut key_bytes = self.local_id.as_bytes().to_vec();
if bucket_index < 256 {
let byte_index = (255 - bucket_index) / 8;
let bit_index = (255 - bucket_index) % 8;
if byte_index < key_bytes.len() {
key_bytes[byte_index] ^= 1 << bit_index;
}
}
let mut hash = [0u8; 32];
hash.copy_from_slice(&key_bytes);
Key::from_hash(hash)
}
async fn iterative_find_node(&self, key: &Key) -> Vec<DHTNode> {
debug!("Starting iterative node lookup for key {}", key.to_hex());
let mut lookup_state = LookupState::new(key.clone(), self.config.alpha);
let initial_nodes = self.routing_table.closest_nodes(key, self.config.alpha).await;
lookup_state.add_nodes(initial_nodes);
let mut iterations = 0;
const MAX_ITERATIONS: usize = 10;
while !lookup_state.is_complete() && iterations < MAX_ITERATIONS {
let nodes_to_query = lookup_state.next_nodes();
if nodes_to_query.is_empty() {
break;
}
let mut queries = Vec::new();
for node in &nodes_to_query {
let query = DHTQuery::FindNode {
key: key.clone(),
requester: self.local_id.to_hex()
};
queries.push(self.simulate_query(node, query));
}
for query_result in futures::future::join_all(queries).await {
if let Ok(DHTResponse::Nodes { nodes }) = query_result {
let dht_nodes: Vec<DHTNode> = nodes.into_iter()
.map(|n| n.to_dht_node())
.collect();
lookup_state.add_nodes(dht_nodes);
}
}
iterations += 1;
}
debug!("Iterative node lookup for key {} completed after {} iterations",
key.to_hex(), iterations);
lookup_state.closest.into_iter()
.take(self.config.replication_factor)
.collect()
}
pub async fn check_consistency(&self, key: &Key) -> Result<ConsistencyReport> {
debug!("Checking consistency for key {}", key.to_hex());
let closest_nodes = self.routing_table
.closest_nodes(key, self.config.replication_factor)
.await;
let mut records_found = Vec::new();
let mut nodes_queried = 0;
let mut nodes_responded = 0;
for node in &closest_nodes {
nodes_queried += 1;
let query = DHTQuery::FindValue {
key: key.clone(),
requester: self.local_id.to_hex()
};
match self.simulate_query(node, query).await {
Ok(DHTResponse::Value { record }) => {
nodes_responded += 1;
records_found.push((node.peer_id.clone(), record));
}
Ok(DHTResponse::Nodes { .. }) => {
nodes_responded += 1;
}
_ => {
}
}
}
let mut consistent = true;
let mut canonical_record: Option<Record> = None;
let mut conflicts = Vec::new();
for (node_id, record) in &records_found {
if let Some(ref canonical) = canonical_record {
if record.value != canonical.value ||
record.created_at != canonical.created_at ||
record.publisher != canonical.publisher {
consistent = false;
conflicts.push((node_id.clone(), record.clone()));
}
} else {
canonical_record = Some(record.clone());
}
}
let report = ConsistencyReport {
key: key.clone(),
nodes_queried,
nodes_responded,
records_found: records_found.len(),
consistent,
canonical_record,
conflicts,
replication_factor: self.config.replication_factor,
};
debug!("Consistency check for key {}: {} nodes queried, {} responded, {} records found, consistent: {}",
key.to_hex(), report.nodes_queried, report.nodes_responded,
report.records_found, report.consistent);
Ok(report)
}
pub async fn repair_record(&self, key: &Key) -> Result<RepairResult> {
debug!("Starting repair for key {}", key.to_hex());
let consistency_report = self.check_consistency(key).await?;
if consistency_report.consistent {
return Ok(RepairResult {
key: key.clone(),
repairs_needed: false,
repairs_attempted: 0,
repairs_successful: 0,
final_state: "consistent".to_string(),
});
}
let canonical_record = if let Some(canonical) = consistency_report.canonical_record {
canonical
} else {
return Ok(RepairResult {
key: key.clone(),
repairs_needed: false,
repairs_attempted: 0,
repairs_successful: 0,
final_state: "no_records_found".to_string(),
});
};
let mut most_recent = canonical_record.clone();
for (_, conflicted_record) in &consistency_report.conflicts {
if conflicted_record.created_at > most_recent.created_at {
most_recent = conflicted_record.clone();
}
}
let closest_nodes = self.routing_table
.closest_nodes(key, self.config.replication_factor)
.await;
let mut repairs_attempted = 0;
let mut repairs_successful = 0;
for node in &closest_nodes {
repairs_attempted += 1;
if self.replicate_record(&most_recent, node).await.is_ok() {
repairs_successful += 1;
}
}
let final_state = if repairs_successful >= (self.config.replication_factor / 2) {
"repaired".to_string()
} else {
"repair_failed".to_string()
};
debug!("Repair for key {} completed: {}/{} repairs successful, final state: {}",
key.to_hex(), repairs_successful, repairs_attempted, final_state);
Ok(RepairResult {
key: key.clone(),
repairs_needed: true,
repairs_attempted,
repairs_successful,
final_state,
})
}
pub async fn create_inbox(&self, inbox_id: &str, owner_peer_id: PeerId) -> Result<InboxInfo> {
info!("Creating inbox {} for peer {}", inbox_id, owner_peer_id);
let inbox_key = Key::from_inbox_id(inbox_id);
let inbox_metadata = InboxMetadata {
inbox_id: inbox_id.to_string(),
owner: owner_peer_id.clone(),
created_at: SystemTime::now(),
message_count: 0,
max_messages: 1000, is_public: true,
access_keys: vec![owner_peer_id.clone()],
};
let metadata_value = serde_json::to_vec(&inbox_metadata)
.map_err(|e| P2PError::DHT(format!("Failed to serialize inbox metadata: {}", e)))?;
let metadata_record = Record {
key: inbox_key.clone(),
value: metadata_value,
publisher: owner_peer_id.clone(),
created_at: SystemTime::now(),
expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), signature: None,
};
self.put_record_with_infinite_ttl(metadata_record).await?;
let index_key = Key::from_inbox_index(inbox_id);
let empty_index = InboxMessageIndex {
inbox_id: inbox_id.to_string(),
messages: Vec::new(),
last_updated: SystemTime::now(),
};
let index_value = serde_json::to_vec(&empty_index)
.map_err(|e| P2PError::DHT(format!("Failed to serialize inbox index: {}", e)))?;
let index_record = Record {
key: index_key,
value: index_value,
publisher: owner_peer_id.clone(),
created_at: SystemTime::now(),
expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), signature: None,
};
self.put_record_with_infinite_ttl(index_record).await?;
let inbox_info = InboxInfo {
inbox_id: inbox_id.to_string(),
three_word_address: self.generate_three_word_address(inbox_id),
owner: owner_peer_id,
created_at: SystemTime::now(),
message_count: 0,
is_accessible: true,
};
info!("Successfully created inbox {} with three-word address: {}",
inbox_id, inbox_info.three_word_address);
Ok(inbox_info)
}
pub async fn send_message_to_inbox(&self, inbox_id: &str, message: InboxMessage) -> Result<()> {
info!("Sending message to inbox {}", inbox_id);
let inbox_key = Key::from_inbox_id(inbox_id);
let metadata_record = self.get(&inbox_key).await
.ok_or_else(|| P2PError::DHT(format!("Inbox {} not found", inbox_id)))?;
let mut inbox_metadata: InboxMetadata = serde_json::from_slice(&metadata_record.value)
.map_err(|e| P2PError::DHT(format!("Failed to deserialize inbox metadata: {}", e)))?;
if inbox_metadata.message_count >= inbox_metadata.max_messages {
return Err(P2PError::DHT(format!("Inbox {} is full", inbox_id)));
}
let message_key = Key::from_inbox_message(inbox_id, &message.id);
let message_value = serde_json::to_vec(&message)
.map_err(|e| P2PError::DHT(format!("Failed to serialize message: {}", e)))?;
let message_record = Record {
key: message_key.clone(),
value: message_value,
publisher: message.sender.clone(),
created_at: message.timestamp,
expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX), signature: None,
};
self.put_record_with_infinite_ttl(message_record).await?;
let index_key = Key::from_inbox_index(inbox_id);
let index_record = self.get(&index_key).await
.ok_or_else(|| P2PError::DHT(format!("Inbox index {} not found", inbox_id)))?;
let mut message_index: InboxMessageIndex = serde_json::from_slice(&index_record.value)
.map_err(|e| P2PError::DHT(format!("Failed to deserialize message index: {}", e)))?;
message_index.messages.push(MessageRef {
message_id: message.id.clone(),
sender: message.sender.clone(),
timestamp: message.timestamp,
message_type: message.message_type.clone(),
});
message_index.last_updated = SystemTime::now();
inbox_metadata.message_count += 1;
let updated_index_value = serde_json::to_vec(&message_index)
.map_err(|e| P2PError::DHT(format!("Failed to serialize updated index: {}", e)))?;
let updated_metadata_value = serde_json::to_vec(&inbox_metadata)
.map_err(|e| P2PError::DHT(format!("Failed to serialize updated metadata: {}", e)))?;
let updated_index_record = Record {
key: index_key,
value: updated_index_value,
publisher: message.sender.clone(),
created_at: SystemTime::now(),
expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX),
signature: None,
};
let updated_metadata_record = Record {
key: inbox_key,
value: updated_metadata_value,
publisher: message.sender.clone(),
created_at: SystemTime::now(),
expires_at: SystemTime::UNIX_EPOCH + Duration::from_secs(u64::MAX),
signature: None,
};
self.put_record_with_infinite_ttl(updated_index_record).await?;
self.put_record_with_infinite_ttl(updated_metadata_record).await?;
info!("Successfully sent message {} to inbox {}", message.id, inbox_id);
Ok(())
}
pub async fn get_inbox_messages(&self, inbox_id: &str, limit: Option<usize>) -> Result<Vec<InboxMessage>> {
info!("Retrieving messages from inbox {}", inbox_id);
let index_key = Key::from_inbox_index(inbox_id);
let index_record = self.get(&index_key).await
.ok_or_else(|| P2PError::DHT(format!("Inbox {} not found", inbox_id)))?;
let message_index: InboxMessageIndex = serde_json::from_slice(&index_record.value)
.map_err(|e| P2PError::DHT(format!("Failed to deserialize message index: {}", e)))?;
let mut messages = Vec::new();
let message_refs: Vec<&MessageRef> = if let Some(limit) = limit {
message_index.messages.iter().rev().take(limit).collect()
} else {
message_index.messages.iter().collect()
};
for message_ref in message_refs {
let message_key = Key::from_inbox_message(inbox_id, &message_ref.message_id);
if let Some(message_record) = self.get(&message_key).await {
if let Ok(message) = serde_json::from_slice::<InboxMessage>(&message_record.value) {
messages.push(message);
}
}
}
messages.sort_by(|a, b| b.timestamp.cmp(&a.timestamp));
info!("Retrieved {} messages from inbox {}", messages.len(), inbox_id);
Ok(messages)
}
pub async fn get_inbox_info(&self, inbox_id: &str) -> Result<Option<InboxInfo>> {
let inbox_key = Key::from_inbox_id(inbox_id);
let metadata_record = self.get(&inbox_key).await;
if let Some(record) = metadata_record {
let metadata: InboxMetadata = serde_json::from_slice(&record.value)
.map_err(|e| P2PError::DHT(format!("Failed to deserialize inbox metadata: {}", e)))?;
let inbox_info = InboxInfo {
inbox_id: inbox_id.to_string(),
three_word_address: self.generate_three_word_address(inbox_id),
owner: metadata.owner,
created_at: metadata.created_at,
message_count: metadata.message_count,
is_accessible: true,
};
Ok(Some(inbox_info))
} else {
Ok(None)
}
}
async fn put_record_with_infinite_ttl(&self, record: Record) -> Result<()> {
self.storage.store(record.clone()).await?;
let closest_nodes = self.routing_table
.closest_nodes(&record.key, self.config.replication_factor)
.await;
for node in &closest_nodes {
if let Err(e) = self.replicate_record(&record, node).await {
debug!("Failed to replicate infinite TTL record to node {}: {}", node.peer_id, e);
}
}
Ok(())
}
fn generate_three_word_address(&self, inbox_id: &str) -> String {
use crate::bootstrap::words::WordEncoder;
let encoder = WordEncoder::new();
let fake_multiaddr = format!("/inbox/{}/dht", inbox_id).parse().unwrap_or_else(|_| {
"/ip6/::1/udp/9000/quic".parse().unwrap()
});
if let Ok(words) = encoder.encode_multiaddr(&fake_multiaddr) {
words.to_string()
} else {
format!("inbox.{}.messages", inbox_id.chars().take(8).collect::<String>())
}
}
}
#[derive(Debug, Clone)]
pub struct DHTStats {
pub local_id: Key,
pub total_nodes: usize,
pub active_buckets: usize,
pub stored_records: usize,
pub expired_records: usize,
}
#[derive(Debug, Clone)]
pub struct ConsistencyReport {
pub key: Key,
pub nodes_queried: usize,
pub nodes_responded: usize,
pub records_found: usize,
pub consistent: bool,
pub canonical_record: Option<Record>,
pub conflicts: Vec<(PeerId, Record)>,
pub replication_factor: usize,
}
#[derive(Debug, Clone)]
pub struct RepairResult {
pub key: Key,
pub repairs_needed: bool,
pub repairs_attempted: usize,
pub repairs_successful: usize,
pub final_state: String,
}
impl LookupState {
pub fn new(target: Key, alpha: usize) -> Self {
Self {
target,
queried: HashMap::new(),
to_query: VecDeque::new(),
closest: Vec::new(),
started_at: Instant::now(),
alpha,
}
}
pub fn add_nodes(&mut self, nodes: Vec<DHTNode>) {
for node in nodes {
if !self.queried.contains_key(&node.peer_id) {
self.to_query.push_back(node);
}
}
let target = &self.target;
self.to_query.make_contiguous().sort_by_key(|node| {
node.key().distance(target).as_bytes().to_vec()
});
}
pub fn next_nodes(&mut self) -> Vec<DHTNode> {
let mut nodes = Vec::new();
for _ in 0..self.alpha {
if let Some(node) = self.to_query.pop_front() {
self.queried.insert(node.peer_id.clone(), Instant::now());
nodes.push(node);
} else {
break;
}
}
nodes
}
pub fn is_complete(&self) -> bool {
self.to_query.is_empty() || self.started_at.elapsed() > Duration::from_secs(30)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InboxMetadata {
pub inbox_id: String,
pub owner: PeerId,
pub created_at: SystemTime,
pub message_count: usize,
pub max_messages: usize,
pub is_public: bool,
pub access_keys: Vec<PeerId>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InboxMessage {
pub id: String,
pub sender: PeerId,
pub recipient_inbox: String,
pub content: String,
pub message_type: String,
pub timestamp: SystemTime,
pub attachments: Vec<MessageAttachment>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageAttachment {
pub filename: String,
pub content_type: String,
pub size: u64,
pub hash: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InboxMessageIndex {
pub inbox_id: String,
pub messages: Vec<MessageRef>,
pub last_updated: SystemTime,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageRef {
pub message_id: String,
pub sender: PeerId,
pub timestamp: SystemTime,
pub message_type: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InboxInfo {
pub inbox_id: String,
pub three_word_address: String,
pub owner: PeerId,
pub created_at: SystemTime,
pub message_count: usize,
pub is_accessible: bool,
}
impl Key {
pub fn from_inbox_id(inbox_id: &str) -> Self {
let mut hasher = Sha256::new();
hasher.update(b"INBOX_METADATA:");
hasher.update(inbox_id.as_bytes());
let hash = hasher.finalize();
Key { hash: hash.into() }
}
pub fn from_inbox_index(inbox_id: &str) -> Self {
let mut hasher = Sha256::new();
hasher.update(b"INBOX_INDEX:");
hasher.update(inbox_id.as_bytes());
let hash = hasher.finalize();
Key { hash: hash.into() }
}
pub fn from_inbox_message(inbox_id: &str, message_id: &str) -> Self {
let mut hasher = Sha256::new();
hasher.update(b"INBOX_MESSAGE:");
hasher.update(inbox_id.as_bytes());
hasher.update(b":");
hasher.update(message_id.as_bytes());
let hash = hasher.finalize();
Key { hash: hash.into() }
}
}