docs.rs failed to build daa-prime-dht-0.2.1
Please check the
build logs for more information.
See
Builds for ideas on how to fix a failed build,
or
Metadata for how to configure docs.rs builds.
If you believe this is docs.rs' fault,
open an issue.
DAA Prime DHT

High-performance Kademlia-based distributed hash table implementation for the Prime distributed machine learning framework. Provides decentralized storage and discovery for ML models, gradients, and metadata.
Overview
DAA Prime DHT implements a robust distributed hash table based on the Kademlia protocol, specifically optimized for machine learning workloads. It provides:
- Distributed Storage: Scalable key-value storage across network nodes
- Efficient Routing: Kademlia routing table for O(log N) lookups
- Fault Tolerance: Built-in replication and failure recovery
- ML Optimization: Specialized for storing ML models and gradients
- libp2p Integration: Seamless networking with modern P2P stack
Features
- 🔄 Kademlia Protocol: Industry-standard DHT with proven scalability
- 🚀 High Performance: Optimized for ML data patterns and sizes
- 🛡️ Fault Tolerant: Configurable replication and self-healing
- 🔍 Efficient Discovery: Fast peer and content discovery
- 📊 Property Testing: Comprehensive test coverage with QuickCheck/PropTest
- 🌐 Network Agnostic: Works with any libp2p transport
Installation
Add this to your Cargo.toml:
[dependencies]
daa-prime-dht = "0.2.1"
daa-prime-core = "0.2.1"
libp2p = "0.53"
Quick Start
Basic DHT Operations
use daa_prime_dht::{Dht, DhtConfig};
use libp2p::PeerId;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let peer_id = PeerId::random();
let config = DhtConfig::default();
let dht = Dht::new(peer_id, config);
let key = b"model_weights_v1".to_vec();
let value = b"serialized_model_data".to_vec();
dht.put(key.clone(), value.clone()).await?;
if let Some(retrieved) = dht.get(key).await? {
println!("Retrieved: {:?}", String::from_utf8_lossy(&retrieved));
}
Ok(())
}
Custom Configuration
use daa_prime_dht::DhtConfig;
use std::time::Duration;
let config = DhtConfig {
k_bucket_size: 20, alpha: 3, replication_factor: 3, refresh_interval: Duration::from_secs(3600), ttl: Duration::from_secs(86400), };
Core Concepts
Kademlia Routing
The DHT uses Kademlia's XOR metric for routing:
use daa_prime_dht::routing::RoutingTable;
use libp2p::PeerId;
let peer_id = PeerId::random();
let routing_table = RoutingTable::new(peer_id, 20);
let target = PeerId::random();
let closest_nodes = routing_table.find_closest(&target, 3);
Storage Management
use daa_prime_dht::storage::Storage;
let mut storage = Storage::new();
storage.put(b"key".to_vec(), b"value".to_vec());
if let Some(value) = storage.get(&b"key".to_vec()) {
println!("Found: {:?}", value);
}
Peer Discovery
use daa_prime_dht::discovery::Discovery;
let discovery = Discovery::new();
discovery.start_discovery().await?;
let peers = discovery.get_peers().await;
Advanced Usage
ML Model Storage
use daa_prime_dht::Dht;
use daa_prime_core::ModelMetadata;
async fn store_model(
dht: &Dht,
model_id: &str,
model_data: Vec<u8>,
metadata: ModelMetadata,
) -> Result<(), Box<dyn std::error::Error>> {
let model_key = format!("model:{}", model_id).into_bytes();
dht.put(model_key, model_data).await?;
let meta_key = format!("meta:{}", model_id).into_bytes();
let meta_data = serde_json::to_vec(&metadata)?;
dht.put(meta_key, meta_data).await?;
Ok(())
}
async fn load_model(
dht: &Dht,
model_id: &str,
) -> Result<Option<(Vec<u8>, ModelMetadata)>, Box<dyn std::error::Error>> {
let model_key = format!("model:{}", model_id).into_bytes();
let meta_key = format!("meta:{}", model_id).into_bytes();
let model_data = dht.get(model_key).await?;
let meta_data = dht.get(meta_key).await?;
match (model_data, meta_data) {
(Some(model), Some(meta)) => {
let metadata: ModelMetadata = serde_json::from_slice(&meta)?;
Ok(Some((model, metadata)))
},
_ => Ok(None),
}
}
Gradient Aggregation
use daa_prime_dht::Dht;
use daa_prime_core::{GradientUpdate, NodeId};
async fn store_gradients(
dht: &Dht,
round: u64,
updates: Vec<GradientUpdate>,
) -> Result<(), Box<dyn std::error::Error>> {
for update in updates {
let key = format!("gradient:{}:{}", round, update.node_id.0).into_bytes();
let data = serde_json::to_vec(&update)?;
dht.put(key, data).await?;
}
Ok(())
}
async fn collect_gradients(
dht: &Dht,
round: u64,
node_ids: Vec<NodeId>,
) -> Result<Vec<GradientUpdate>, Box<dyn std::error::Error>> {
let mut gradients = Vec::new();
for node_id in node_ids {
let key = format!("gradient:{}:{}", round, node_id.0).into_bytes();
if let Some(data) = dht.get(key).await? {
let update: GradientUpdate = serde_json::from_slice(&data)?;
gradients.push(update);
}
}
Ok(gradients)
}
Custom Storage Backends
use daa_prime_dht::storage::{Storage, StorageBackend};
use async_trait::async_trait;
struct RedisStorage {
client: redis::Client,
}
#[async_trait]
impl StorageBackend for RedisStorage {
async fn put(&self, key: Vec<u8>, value: Vec<u8>) -> Result<(), Error> {
let mut conn = self.client.get_async_connection().await?;
redis::cmd("SET")
.arg(key)
.arg(value)
.query_async(&mut conn)
.await?;
Ok(())
}
async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
let mut conn = self.client.get_async_connection().await?;
let result: Option<Vec<u8>> = redis::cmd("GET")
.arg(key)
.query_async(&mut conn)
.await?;
Ok(result)
}
}
Configuration Options
DHT Parameters
use daa_prime_dht::DhtConfig;
use std::time::Duration;
let config = DhtConfig {
k_bucket_size: 20, alpha: 3,
replication_factor: 3, ttl: Duration::from_secs(86400),
refresh_interval: Duration::from_secs(3600), };
Performance Tuning
let high_perf_config = DhtConfig {
k_bucket_size: 32, alpha: 8, replication_factor: 5, refresh_interval: Duration::from_secs(1800), ttl: Duration::from_secs(172800), };
let low_latency_config = DhtConfig {
k_bucket_size: 10, alpha: 2, replication_factor: 2, refresh_interval: Duration::from_secs(7200), ttl: Duration::from_secs(43200), };
Testing
The crate includes comprehensive property-based testing:
use daa_prime_dht::{Dht, DhtConfig};
use proptest::prelude::*;
use quickcheck_macros::quickcheck;
proptest! {
#[test]
fn test_dht_config_validation(
k_bucket in 5usize..100usize,
alpha in 1usize..10usize,
replication in 1usize..20usize,
) {
let config = DhtConfig {
k_bucket_size: k_bucket,
alpha,
replication_factor: replication,
refresh_interval: Duration::from_secs(3600),
ttl: Duration::from_secs(86400),
};
assert!(config.alpha <= config.k_bucket_size);
assert!(config.replication_factor >= 1);
}
}
#[quickcheck]
async fn test_put_get_consistency(key: Vec<u8>, value: Vec<u8>) -> bool {
if key.is_empty() || value.is_empty() {
return true;
}
let peer_id = PeerId::random();
let dht = Dht::new(peer_id, DhtConfig::default());
dht.put(key.clone(), value.clone()).await.is_ok() &&
dht.get(key).await.unwrap() == Some(value)
}
Performance Benchmarks
#[cfg(test)]
mod benchmarks {
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use daa_prime_dht::{Dht, DhtConfig};
fn bench_put_operations(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
let peer_id = PeerId::random();
let dht = Dht::new(peer_id, DhtConfig::default());
c.bench_function("dht_put_1kb", |b| {
b.to_async(&rt).iter(|| async {
let key = black_box(format!("key_{}", fastrand::u64(..)).into_bytes());
let value = black_box(vec![0u8; 1024]);
dht.put(key, value).await.unwrap();
});
});
}
criterion_group!(benches, bench_put_operations);
criterion_main!(benches);
}
Monitoring and Metrics
use daa_prime_dht::Dht;
async fn monitor_dht_health(dht: &Dht) {
let routing_table = dht.routing_table.read().await;
let storage = dht.storage.read().await;
println!("DHT Health Report:");
println!(" Active peers: {}", routing_table.len());
println!(" Stored keys: {}", storage.len());
println!(" Bucket distribution: {:?}", routing_table.bucket_sizes());
}
Integration Examples
With DAA Orchestrator
use daa_prime_dht::Dht;
async fn setup_orchestrated_dht() -> Result<(), Box<dyn std::error::Error>> {
let peer_id = PeerId::random();
let dht = Dht::new(peer_id, DhtConfig::default());
Ok(())
}
With Prime Trainer
use daa_prime_dht::Dht;
use daa_prime_trainer::TrainerNode;
async fn trainer_with_dht() -> Result<(), Box<dyn std::error::Error>> {
let peer_id = PeerId::random();
let dht = Dht::new(peer_id, DhtConfig::default());
let trainer = TrainerNode::new("trainer-001".to_string()).await?;
let checkpoint_key = b"checkpoint_epoch_10".to_vec();
let checkpoint_data = b"model_state_dict".to_vec();
dht.put(checkpoint_key, checkpoint_data).await?;
Ok(())
}
Troubleshooting
Common Issues
-
Connection Failures
use libp2p::{tcp, noise, yamux, Transport};
let transport = tcp::tokio::Transport::default()
.upgrade(noise::Config::new(&keypair)?)
.multiplex(yamux::Config::default())
.boxed();
-
High Memory Usage
async fn cleanup_expired_data(dht: &Dht) {
let mut storage = dht.storage.write().await;
storage.cleanup_expired();
}
-
Slow Lookups
let config = DhtConfig {
alpha: 8, ..Default::default()
};
Roadmap
Contributing
Contributions are welcome! Please see our Contributing Guide for details.
License
This project is licensed under the MIT License - see the LICENSE file for details.
Related Crates
References
Support