use crate::{Result, WantListEntry, WantType};
use cid::Cid;
use helia_interface::HeliaError;
use libp2p::PeerId;
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{Duration, Instant};
use tracing::{debug, info, warn};
pub struct Session {
id: String,
interests: HashSet<Cid>,
received_blocks: HashSet<Cid>,
peers: HashSet<PeerId>,
created_at: Instant,
last_activity: Instant,
timeout: Duration,
active: bool,
priority: i32,
max_peers: usize,
stats: SessionStats,
}
#[derive(Debug, Clone, Default)]
pub struct SessionStats {
pub blocks_requested: u64,
pub blocks_received: u64,
pub blocks_failed: u64,
pub bytes_received: u64,
pub peers_tried: u64,
pub average_response_time_ms: f64,
pub duration: Duration,
}
#[derive(Debug, Clone)]
pub struct SessionConfig {
pub timeout: Duration,
pub priority: i32,
pub max_peers: usize,
pub rebroadcast_wants: bool,
pub rebroadcast_interval: Duration,
}
impl Default for SessionConfig {
fn default() -> Self {
Self {
timeout: Duration::from_secs(300), priority: 1,
max_peers: 10,
rebroadcast_wants: true,
rebroadcast_interval: Duration::from_secs(30),
}
}
}
impl Session {
pub fn new(id: String, config: SessionConfig) -> Self {
let now = Instant::now();
Self {
id,
interests: HashSet::new(),
received_blocks: HashSet::new(),
peers: HashSet::new(),
created_at: now,
last_activity: now,
timeout: config.timeout,
active: true,
priority: config.priority,
max_peers: config.max_peers,
stats: SessionStats::default(),
}
}
pub fn id(&self) -> &str {
&self.id
}
pub fn is_active(&self) -> bool {
self.active && !self.is_expired()
}
pub fn is_expired(&self) -> bool {
Instant::now().duration_since(self.last_activity) > self.timeout
}
pub fn add_interest(&mut self, cid: Cid) {
debug!("Session {} adding interest in {}", self.id, cid);
self.interests.insert(cid);
self.update_activity();
self.stats.blocks_requested += 1;
}
pub fn remove_interest(&mut self, cid: &Cid) {
debug!("Session {} removing interest in {}", self.id, cid);
self.interests.remove(cid);
self.update_activity();
}
pub fn interests(&self) -> &HashSet<Cid> {
&self.interests
}
pub fn pending_interests(&self) -> HashSet<Cid> {
self.interests.difference(&self.received_blocks).cloned().collect()
}
pub fn mark_block_received(&mut self, cid: &Cid, size: usize) {
debug!("Session {} received block {}", self.id, cid);
self.received_blocks.insert(*cid);
self.interests.remove(cid);
self.update_activity();
self.stats.blocks_received += 1;
self.stats.bytes_received += size as u64;
}
pub fn mark_block_failed(&mut self, cid: &Cid) {
warn!("Session {} failed to get block {}", self.id, cid);
self.interests.remove(cid);
self.update_activity();
self.stats.blocks_failed += 1;
}
pub fn add_peer(&mut self, peer_id: PeerId) -> bool {
if self.peers.len() >= self.max_peers {
return false;
}
if self.peers.insert(peer_id) {
debug!("Session {} added peer {}", self.id, peer_id);
self.update_activity();
self.stats.peers_tried += 1;
true
} else {
false
}
}
pub fn remove_peer(&mut self, peer_id: &PeerId) {
if self.peers.remove(peer_id) {
debug!("Session {} removed peer {}", self.id, peer_id);
self.update_activity();
}
}
pub fn peers(&self) -> &HashSet<PeerId> {
&self.peers
}
pub fn priority(&self) -> i32 {
self.priority
}
pub fn set_priority(&mut self, priority: i32) {
self.priority = priority;
self.update_activity();
}
pub fn generate_wantlist(&self) -> Vec<WantListEntry> {
self.pending_interests()
.into_iter()
.map(|cid| WantListEntry {
cid,
priority: self.priority,
want_type: WantType::Block,
cancel: false,
send_dont_have: true,
})
.collect()
}
pub fn wants_block(&self, cid: &Cid) -> bool {
self.interests.contains(cid) && !self.received_blocks.contains(cid)
}
pub fn close(&mut self) {
info!("Closing session {}", self.id);
self.active = false;
self.interests.clear();
self.peers.clear();
}
fn update_activity(&mut self) {
self.last_activity = Instant::now();
}
pub fn statistics(&self) -> SessionStats {
let mut stats = self.stats.clone();
stats.duration = Instant::now().duration_since(self.created_at);
if stats.blocks_received > 0 {
stats.average_response_time_ms = stats.duration.as_millis() as f64 / stats.blocks_received as f64;
}
stats
}
pub fn completion_percentage(&self) -> f64 {
if self.stats.blocks_requested == 0 {
return 100.0;
}
(self.stats.blocks_received as f64 / self.stats.blocks_requested as f64) * 100.0
}
}
pub struct SessionManager {
sessions: HashMap<String, Session>,
session_counter: u64,
default_config: SessionConfig,
running: bool,
}
impl SessionManager {
pub fn new() -> Self {
Self {
sessions: HashMap::new(),
session_counter: 0,
default_config: SessionConfig::default(),
running: false,
}
}
pub fn with_config(config: SessionConfig) -> Self {
Self {
sessions: HashMap::new(),
session_counter: 0,
default_config: config,
running: false,
}
}
pub async fn start(&mut self) -> Result<()> {
if self.running {
return Ok(());
}
info!("Starting session manager");
self.running = true;
Ok(())
}
pub async fn stop(&mut self) -> Result<()> {
if !self.running {
return Ok(());
}
info!("Stopping session manager");
for session in self.sessions.values_mut() {
session.close();
}
self.sessions.clear();
self.running = false;
Ok(())
}
pub fn create_session(&mut self) -> Result<String> {
self.create_session_with_config(self.default_config.clone())
}
pub fn create_session_with_config(&mut self, config: SessionConfig) -> Result<String> {
if !self.running {
return Err(HeliaError::other("Session manager not running"));
}
self.session_counter += 1;
let session_id = format!("session-{}", self.session_counter);
let session = Session::new(session_id.clone(), config);
self.sessions.insert(session_id.clone(), session);
info!("Created session {}", session_id);
Ok(session_id)
}
pub fn get_session(&self, session_id: &str) -> Option<&Session> {
self.sessions.get(session_id)
}
pub fn get_session_mut(&mut self, session_id: &str) -> Option<&mut Session> {
self.sessions.get_mut(session_id)
}
pub fn close_session(&mut self, session_id: &str) -> Result<()> {
if let Some(mut session) = self.sessions.remove(session_id) {
session.close();
info!("Closed session {}", session_id);
}
Ok(())
}
pub fn active_sessions(&self) -> Vec<&Session> {
self.sessions.values().filter(|s| s.is_active()).collect()
}
pub fn sessions_wanting_block(&self, cid: &Cid) -> Vec<&Session> {
self.sessions.values()
.filter(|s| s.is_active() && s.wants_block(cid))
.collect()
}
pub fn cleanup_expired_sessions(&mut self) -> usize {
let expired_sessions: Vec<String> = self.sessions.iter()
.filter(|(_, session)| session.is_expired())
.map(|(id, _)| id.clone())
.collect();
let count = expired_sessions.len();
for session_id in expired_sessions {
warn!("Removing expired session: {}", session_id);
let _ = self.close_session(&session_id);
}
count
}
pub fn get_all_wantlists(&self) -> HashMap<String, Vec<WantListEntry>> {
self.sessions.iter()
.filter(|(_, session)| session.is_active())
.map(|(id, session)| (id.clone(), session.generate_wantlist()))
.collect()
}
pub fn get_aggregated_wantlist(&self) -> Vec<WantListEntry> {
let mut aggregated: HashMap<Cid, WantListEntry> = HashMap::new();
for session in self.sessions.values() {
if session.is_active() {
for entry in session.generate_wantlist() {
aggregated.entry(entry.cid)
.and_modify(|existing| {
if entry.priority > existing.priority {
existing.priority = entry.priority;
}
})
.or_insert(entry);
}
}
}
aggregated.into_values().collect()
}
pub fn session_count(&self) -> usize {
self.sessions.len()
}
pub fn active_session_count(&self) -> usize {
self.sessions.values().filter(|s| s.is_active()).count()
}
pub fn get_statistics(&self) -> SessionManagerStats {
let mut stats = SessionManagerStats::default();
stats.total_sessions = self.sessions.len();
stats.active_sessions = self.active_session_count();
for session in self.sessions.values() {
let session_stats = session.statistics();
stats.total_blocks_requested += session_stats.blocks_requested;
stats.total_blocks_received += session_stats.blocks_received;
stats.total_blocks_failed += session_stats.blocks_failed;
stats.total_bytes_received += session_stats.bytes_received;
if session.is_active() {
stats.total_pending_interests += session.pending_interests().len() as u64;
}
}
if stats.total_blocks_requested > 0 {
stats.success_rate = (stats.total_blocks_received as f64 / stats.total_blocks_requested as f64) * 100.0;
}
stats
}
}
impl Default for SessionManager {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Default)]
pub struct SessionManagerStats {
pub total_sessions: usize,
pub active_sessions: usize,
pub total_blocks_requested: u64,
pub total_blocks_received: u64,
pub total_blocks_failed: u64,
pub total_bytes_received: u64,
pub total_pending_interests: u64,
pub success_rate: f64,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_session_manager_lifecycle() {
let mut manager = SessionManager::new();
assert!(!manager.running);
assert!(manager.start().await.is_ok());
assert!(manager.running);
assert!(manager.stop().await.is_ok());
assert!(!manager.running);
}
#[tokio::test]
async fn test_session_creation() {
let mut manager = SessionManager::new();
manager.start().await.unwrap();
let session_id = manager.create_session().unwrap();
assert_eq!(manager.session_count(), 1);
let session = manager.get_session(&session_id).unwrap();
assert!(session.is_active());
assert_eq!(session.id(), session_id);
}
#[test]
fn test_session_interests() {
let mut session = Session::new("test".to_string(), SessionConfig::default());
let cid = Cid::default();
assert!(!session.wants_block(&cid));
session.add_interest(cid);
assert!(session.wants_block(&cid));
assert_eq!(session.interests().len(), 1);
session.mark_block_received(&cid, 100);
assert!(!session.wants_block(&cid));
assert_eq!(session.pending_interests().len(), 0);
}
#[test]
fn test_session_wantlist() {
let mut session = Session::new("test".to_string(), SessionConfig::default());
let cid1 = Cid::default();
session.add_interest(cid1);
let wantlist = session.generate_wantlist();
assert_eq!(wantlist.len(), 1);
assert_eq!(wantlist[0].cid, cid1);
assert!(!wantlist[0].cancel);
}
#[test]
fn test_session_statistics() {
let mut session = Session::new("test".to_string(), SessionConfig::default());
let cid = Cid::default();
session.add_interest(cid);
session.mark_block_received(&cid, 100);
let stats = session.statistics();
assert_eq!(stats.blocks_requested, 1);
assert_eq!(stats.blocks_received, 1);
assert_eq!(stats.bytes_received, 100);
assert_eq!(session.completion_percentage(), 100.0);
}
#[tokio::test]
async fn test_session_cleanup() {
let mut manager = SessionManager::new();
let config = SessionConfig {
timeout: Duration::from_millis(1), ..Default::default()
};
manager.start().await.unwrap();
let session_id = manager.create_session_with_config(config).unwrap();
assert_eq!(manager.session_count(), 1);
tokio::time::sleep(Duration::from_millis(10)).await;
let cleaned = manager.cleanup_expired_sessions();
assert_eq!(cleaned, 1);
assert_eq!(manager.session_count(), 0);
}
}