use anyhow::{Result, anyhow};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::time::{Duration, SystemTime};
use crate::p2p::types::{Package, PackageUpdate, Query, SearchResult};
#[derive(Debug)]
pub struct MarketplaceBehaviour {
dht: DHTState,
gossip: GossipState,
request_response: RequestResponseState,
package_cache: HashMap<String, Package>,
peers: HashMap<String, PeerState>,
}
#[derive(Debug)]
struct DHTState {
local_packages: HashSet<String>,
provider_records: HashMap<String, Vec<ProviderRecord>>,
pending_queries: HashMap<String, Query>,
}
#[derive(Debug, Clone)]
struct ProviderRecord {
peer_id: String,
timestamp: SystemTime,
addresses: Vec<String>,
}
#[derive(Debug)]
struct GossipState {
topics: HashSet<String>,
subscriptions: HashMap<String, HashSet<String>>, message_cache: Vec<GossipMessage>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct GossipMessage {
id: String,
topic: String,
data: Vec<u8>,
timestamp: SystemTime,
}
#[derive(Debug)]
struct RequestResponseState {
pending_requests: HashMap<String, PendingRequest>,
}
#[derive(Debug, Clone)]
struct PendingRequest {
request_type: RequestType,
target_peer: String,
timestamp: SystemTime,
}
#[derive(Debug, Clone)]
enum RequestType {
GetPackage(String),
SearchPackages(Query),
GetMetadata(String),
}
#[derive(Debug, Clone)]
struct PeerState {
peer_id: String,
connected: bool,
addresses: Vec<String>,
packages: HashSet<String>,
last_seen: SystemTime,
}
impl MarketplaceBehaviour {
pub fn new() -> Self {
Self {
dht: DHTState {
local_packages: HashSet::new(),
provider_records: HashMap::new(),
pending_queries: HashMap::new(),
},
gossip: GossipState {
topics: HashSet::new(),
subscriptions: HashMap::new(),
message_cache: Vec::new(),
},
request_response: RequestResponseState {
pending_requests: HashMap::new(),
},
package_cache: HashMap::new(),
peers: HashMap::new(),
}
}
pub fn publish_package(&mut self, package: Package) -> Result<()> {
let package_id = package.id.clone();
self.package_cache.insert(package_id.clone(), package.clone());
self.dht.local_packages.insert(package_id.clone());
let update = PackageUpdate {
package_id: package_id.clone(),
version: package.version.clone(),
update_type: crate::p2p::types::UpdateType::NewPackage,
timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.map_err(|e| anyhow!("Time error: {}", e))?
.as_secs() as i64,
};
self.gossip_announce(update)?;
Ok(())
}
pub fn search_packages(&mut self, query: Query) -> Result<Vec<SearchResult>> {
let query_id = uuid::Uuid::new_v4().to_string();
self.dht.pending_queries.insert(query_id.clone(), query.clone());
let mut results: Vec<SearchResult> = self.package_cache
.values()
.filter(|pkg| pkg.matches_query(&query))
.map(|pkg| SearchResult {
package: pkg.clone(),
score: pkg.calculate_score(&query),
peer_count: self.count_providers(&pkg.id),
})
.collect();
results.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap_or(std::cmp::Ordering::Equal));
results.truncate(query.limit);
Ok(results)
}
pub fn get_package(&self, package_id: &str) -> Option<Package> {
self.package_cache.get(package_id).cloned()
}
pub fn subscribe_to_updates(&mut self, topic: &str) -> Result<()> {
self.gossip.topics.insert(topic.to_string());
Ok(())
}
pub fn handle_gossip_message(&mut self, topic: &str, data: Vec<u8>) -> Result<()> {
let update: PackageUpdate = serde_json::from_slice(&data)
.map_err(|e| anyhow!("Failed to parse update: {}", e))?;
match update.update_type {
crate::p2p::types::UpdateType::NewPackage => {
self.request_package(&update.package_id)?;
}
crate::p2p::types::UpdateType::NewVersion => {
if let Some(pkg) = self.package_cache.get_mut(&update.package_id) {
pkg.version = update.version;
pkg.updated_at = update.timestamp;
}
}
_ => {}
}
Ok(())
}
pub fn add_peer(&mut self, peer_id: String, addresses: Vec<String>) {
let peer_state = PeerState {
peer_id: peer_id.clone(),
connected: true,
addresses,
packages: HashSet::new(),
last_seen: SystemTime::now(),
};
self.peers.insert(peer_id, peer_state);
}
pub fn remove_peer(&mut self, peer_id: &str) {
if let Some(peer) = self.peers.get_mut(peer_id) {
peer.connected = false;
}
}
pub fn register_provider(&mut self, package_id: String, peer_id: String, addresses: Vec<String>) {
let record = ProviderRecord {
peer_id: peer_id.clone(),
timestamp: SystemTime::now(),
addresses,
};
self.dht.provider_records
.entry(package_id.clone())
.or_insert_with(Vec::new)
.push(record);
if let Some(peer) = self.peers.get_mut(&peer_id) {
peer.packages.insert(package_id);
}
}
pub fn get_providers(&self, package_id: &str) -> Vec<ProviderRecord> {
self.dht.provider_records
.get(package_id)
.cloned()
.unwrap_or_default()
}
fn count_providers(&self, package_id: &str) -> usize {
self.dht.provider_records
.get(package_id)
.map(|records| records.len())
.unwrap_or(0)
}
fn gossip_announce(&mut self, update: PackageUpdate) -> Result<()> {
let topic = format!("ggen-marketplace/updates");
let data = serde_json::to_vec(&update)
.map_err(|e| anyhow!("Failed to serialize update: {}", e))?;
let message = GossipMessage {
id: uuid::Uuid::new_v4().to_string(),
topic: topic.clone(),
data,
timestamp: SystemTime::now(),
};
self.gossip.message_cache.push(message);
Ok(())
}
fn request_package(&mut self, package_id: &str) -> Result<()> {
let providers = self.get_providers(package_id);
if providers.is_empty() {
return Err(anyhow!("No providers found for package: {}", package_id));
}
let provider = &providers[0];
let request = PendingRequest {
request_type: RequestType::GetPackage(package_id.to_string()),
target_peer: provider.peer_id.clone(),
timestamp: SystemTime::now(),
};
let request_id = uuid::Uuid::new_v4().to_string();
self.request_response.pending_requests.insert(request_id, request);
Ok(())
}
pub fn get_stats(&self) -> NetworkStats {
NetworkStats {
connected_peers: self.peers.values().filter(|p| p.connected).count(),
total_peers: self.peers.len(),
cached_packages: self.package_cache.len(),
provider_records: self.dht.provider_records.len(),
gossip_topics: self.gossip.topics.len(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NetworkStats {
pub connected_peers: usize,
pub total_peers: usize,
pub cached_packages: usize,
pub provider_records: usize,
pub gossip_topics: usize,
}
impl Default for MarketplaceBehaviour {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::p2p::types::Package;
#[test]
fn test_publish_package() {
let mut behaviour = MarketplaceBehaviour::new();
let package = Package::new("test-package".to_string(), "1.0.0".to_string());
let result = behaviour.publish_package(package.clone());
assert!(result.is_ok());
assert!(behaviour.package_cache.contains_key(&package.id));
}
#[test]
fn test_search_packages() {
let mut behaviour = MarketplaceBehaviour::new();
let package = Package::new("rust-test".to_string(), "1.0.0".to_string());
behaviour.publish_package(package).unwrap();
let query = Query::new(vec!["rust".to_string()]);
let results = behaviour.search_packages(query).unwrap();
assert_eq!(results.len(), 1);
}
#[test]
fn test_peer_management() {
let mut behaviour = MarketplaceBehaviour::new();
behaviour.add_peer(
"peer1".to_string(),
vec!["/ip4/127.0.0.1/tcp/4001".to_string()],
);
assert_eq!(behaviour.peers.len(), 1);
assert!(behaviour.peers.get("peer1").unwrap().connected);
behaviour.remove_peer("peer1");
assert!(!behaviour.peers.get("peer1").unwrap().connected);
}
}