use crate::client::LdpClient;
use crate::config::LdpAdapterConfig;
use crate::types::messages::{LdpEnvelope, LdpMessageBody};
use crate::types::payload::{negotiate_payload_mode, PayloadMode};
use crate::types::session::{LdpSession, SessionState};
use chrono::Utc;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, info};
#[derive(Clone)]
pub struct SessionManager {
sessions: Arc<RwLock<HashMap<String, LdpSession>>>,
client: LdpClient,
config: LdpAdapterConfig,
}
impl SessionManager {
pub fn new(client: LdpClient, config: LdpAdapterConfig) -> Self {
Self {
sessions: Arc::new(RwLock::new(HashMap::new())),
client,
config,
}
}
pub async fn get_or_establish(&self, url: &str) -> Result<LdpSession, String> {
{
let sessions = self.sessions.read().await;
if let Some(session) = sessions.get(url) {
if session.is_active() {
debug!(url = %url, session_id = %session.session_id, "Reusing existing LDP session");
return Ok(session.clone());
}
debug!(url = %url, "Existing session expired or inactive, establishing new one");
}
}
let session = self.establish_session(url).await?;
{
let mut sessions = self.sessions.write().await;
sessions.insert(url.to_string(), session.clone());
}
Ok(session)
}
async fn establish_session(&self, url: &str) -> Result<LdpSession, String> {
let session_config = &self.config.session;
let our_delegate_id = &self.config.delegate_id;
info!(url = %url, "Establishing new LDP session");
let mut hello = LdpEnvelope::new(
"", our_delegate_id,
url,
LdpMessageBody::Hello {
delegate_id: our_delegate_id.clone(),
supported_modes: session_config.preferred_payload_modes.clone(),
},
PayloadMode::Text,
);
if let Some(ref secret) = self.config.signing_secret {
crate::signing::apply_signature(&mut hello, secret);
}
let hello_response = self.client.send_message(url, &hello).await?;
let remote_modes = match &hello_response.body {
LdpMessageBody::CapabilityManifest { capabilities } => {
capabilities
.get("supported_modes")
.and_then(|v| serde_json::from_value::<Vec<PayloadMode>>(v.clone()).ok())
.unwrap_or_else(|| vec![PayloadMode::Text])
}
other => {
return Err(format!(
"Expected CAPABILITY_MANIFEST response to HELLO, got: {:?}",
std::mem::discriminant(other)
));
}
};
let remote_delegate_id = hello_response.from.clone();
let negotiated =
negotiate_payload_mode(&session_config.preferred_payload_modes, &remote_modes);
debug!(
mode = %negotiated.mode,
fallbacks = ?negotiated.fallback_chain,
"Payload mode negotiated"
);
let session_id = uuid::Uuid::new_v4().to_string();
let mut propose = LdpEnvelope::new(
&session_id,
our_delegate_id,
&remote_delegate_id,
LdpMessageBody::SessionPropose {
config: serde_json::json!({
"payload_mode": negotiated.mode,
"ttl_secs": session_config.ttl_secs,
"trust_domain": self.config.trust_domain.name,
}),
},
PayloadMode::Text,
);
if let Some(ref secret) = self.config.signing_secret {
crate::signing::apply_signature(&mut propose, secret);
}
let propose_response = self.client.send_message(url, &propose).await?;
match &propose_response.body {
LdpMessageBody::SessionAccept {
session_id: accepted_id,
negotiated_mode,
} => {
info!(
session_id = %accepted_id,
mode = %negotiated_mode,
"LDP session established"
);
let now = Utc::now();
Ok(LdpSession {
session_id: accepted_id.clone(),
remote_url: url.to_string(),
remote_delegate_id,
state: SessionState::Active,
payload: negotiated,
trust_domain: self.config.trust_domain.clone(),
created_at: now,
last_used: now,
ttl_secs: session_config.ttl_secs,
task_count: 0,
})
}
LdpMessageBody::SessionReject { reason, error } => {
if let Some(err) = error {
Err(format!("Session rejected: [{}] {}", err.code, err.message))
} else {
Err(format!("Session rejected by remote: {}", reason))
}
}
other => Err(format!(
"Expected SESSION_ACCEPT/REJECT, got: {:?}",
std::mem::discriminant(other)
)),
}
}
pub async fn touch(&self, url: &str) {
let mut sessions = self.sessions.write().await;
if let Some(session) = sessions.get_mut(url) {
session.touch();
session.task_count += 1;
}
}
pub async fn close(&self, url: &str) -> Result<(), String> {
let session = {
let mut sessions = self.sessions.write().await;
sessions.remove(url)
};
if let Some(session) = session {
let close_msg = LdpEnvelope::new(
&session.session_id,
&self.config.delegate_id,
&session.remote_delegate_id,
LdpMessageBody::SessionClose { reason: None },
session.payload.mode,
);
let _ = self.client.send_message(url, &close_msg).await;
info!(session_id = %session.session_id, "LDP session closed");
}
Ok(())
}
pub async fn close_all(&self) {
let urls: Vec<String> = {
let sessions = self.sessions.read().await;
sessions.keys().cloned().collect()
};
for url in urls {
let _ = self.close(&url).await;
}
}
pub async fn active_count(&self) -> usize {
let sessions = self.sessions.read().await;
sessions.values().filter(|s| s.is_active()).count()
}
}