Skip to main content

ldp_protocol/
session_manager.rs

1//! LDP session cache and lifecycle management.
2//!
3//! The session manager is the key architectural component that makes LDP
4//! sessions transparent to JamJet's workflow engine. From the outside,
5//! `invoke()` is request→response. Internally, the session manager handles:
6//!
7//! 1. Check if a session exists for (url, config) pair
8//! 2. If not, run HELLO → CAPABILITY_MANIFEST → SESSION_PROPOSE → SESSION_ACCEPT
9//! 3. Cache the session
10//! 4. Return the active session for task submission
11
12use crate::client::LdpClient;
13use crate::config::LdpAdapterConfig;
14use crate::types::messages::{LdpEnvelope, LdpMessageBody};
15use crate::types::payload::{negotiate_payload_mode, PayloadMode};
16use crate::types::session::{LdpSession, SessionState};
17use chrono::Utc;
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::RwLock;
21use tracing::{debug, info};
22
23/// Manages LDP session lifecycle and caching.
24///
25/// Thread-safe: uses `RwLock` internally.
26#[derive(Clone)]
27pub struct SessionManager {
28    /// Active sessions keyed by remote URL.
29    sessions: Arc<RwLock<HashMap<String, LdpSession>>>,
30    /// LDP HTTP client for protocol messages.
31    client: LdpClient,
32    /// Adapter configuration.
33    config: LdpAdapterConfig,
34}
35
36impl SessionManager {
37    /// Create a new session manager.
38    pub fn new(client: LdpClient, config: LdpAdapterConfig) -> Self {
39        Self {
40            sessions: Arc::new(RwLock::new(HashMap::new())),
41            client,
42            config,
43        }
44    }
45
46    /// Get or establish a session for the given remote URL.
47    ///
48    /// If an active, non-expired session exists, returns it.
49    /// Otherwise, runs the full handshake sequence.
50    pub async fn get_or_establish(&self, url: &str) -> Result<LdpSession, String> {
51        // Check for existing active session.
52        {
53            let sessions = self.sessions.read().await;
54            if let Some(session) = sessions.get(url) {
55                if session.is_active() {
56                    debug!(url = %url, session_id = %session.session_id, "Reusing existing LDP session");
57                    return Ok(session.clone());
58                }
59                debug!(url = %url, "Existing session expired or inactive, establishing new one");
60            }
61        }
62
63        // Establish a new session.
64        let session = self.establish_session(url).await?;
65
66        // Cache it.
67        {
68            let mut sessions = self.sessions.write().await;
69            sessions.insert(url.to_string(), session.clone());
70        }
71
72        Ok(session)
73    }
74
75    /// Run the full LDP session establishment handshake:
76    /// HELLO → CAPABILITY_MANIFEST → SESSION_PROPOSE → SESSION_ACCEPT
77    async fn establish_session(&self, url: &str) -> Result<LdpSession, String> {
78        let session_config = &self.config.session;
79        let our_delegate_id = &self.config.delegate_id;
80
81        info!(url = %url, "Establishing new LDP session");
82
83        // Step 1: Send HELLO
84        let mut hello = LdpEnvelope::new(
85            "", // No session yet
86            our_delegate_id,
87            url,
88            LdpMessageBody::Hello {
89                delegate_id: our_delegate_id.clone(),
90                supported_modes: session_config.preferred_payload_modes.clone(),
91            },
92            PayloadMode::Text,
93        );
94
95        // Sign if configured
96        if let Some(ref secret) = self.config.signing_secret {
97            crate::signing::apply_signature(&mut hello, secret);
98        }
99
100        let hello_response = self.client.send_message(url, &hello).await?;
101
102        // Step 2: Parse CAPABILITY_MANIFEST response
103        let remote_modes = match &hello_response.body {
104            LdpMessageBody::CapabilityManifest { capabilities } => {
105                // Extract supported modes from capability manifest.
106                capabilities
107                    .get("supported_modes")
108                    .and_then(|v| serde_json::from_value::<Vec<PayloadMode>>(v.clone()).ok())
109                    .unwrap_or_else(|| vec![PayloadMode::Text])
110            }
111            other => {
112                return Err(format!(
113                    "Expected CAPABILITY_MANIFEST response to HELLO, got: {:?}",
114                    std::mem::discriminant(other)
115                ));
116            }
117        };
118
119        let remote_delegate_id = hello_response.from.clone();
120
121        // Step 3: Negotiate payload mode
122        let negotiated = negotiate_payload_mode(
123            &session_config.preferred_payload_modes,
124            &remote_modes,
125        );
126
127        debug!(
128            mode = %negotiated.mode,
129            fallbacks = ?negotiated.fallback_chain,
130            "Payload mode negotiated"
131        );
132
133        // Step 4: Send SESSION_PROPOSE (trust is enforced in discover())
134        let session_id = uuid::Uuid::new_v4().to_string();
135        let mut propose = LdpEnvelope::new(
136            &session_id,
137            our_delegate_id,
138            &remote_delegate_id,
139            LdpMessageBody::SessionPropose {
140                config: serde_json::json!({
141                    "payload_mode": negotiated.mode,
142                    "ttl_secs": session_config.ttl_secs,
143                    "trust_domain": self.config.trust_domain.name,
144                }),
145            },
146            PayloadMode::Text,
147        );
148
149        // Sign if configured
150        if let Some(ref secret) = self.config.signing_secret {
151            crate::signing::apply_signature(&mut propose, secret);
152        }
153
154        let propose_response = self.client.send_message(url, &propose).await?;
155
156        // Step 6: Handle SESSION_ACCEPT or SESSION_REJECT
157        match &propose_response.body {
158            LdpMessageBody::SessionAccept {
159                session_id: accepted_id,
160                negotiated_mode,
161            } => {
162                info!(
163                    session_id = %accepted_id,
164                    mode = %negotiated_mode,
165                    "LDP session established"
166                );
167
168                let now = Utc::now();
169                Ok(LdpSession {
170                    session_id: accepted_id.clone(),
171                    remote_url: url.to_string(),
172                    remote_delegate_id,
173                    state: SessionState::Active,
174                    payload: negotiated,
175                    trust_domain: self.config.trust_domain.clone(),
176                    created_at: now,
177                    last_used: now,
178                    ttl_secs: session_config.ttl_secs,
179                    task_count: 0,
180                })
181            }
182            LdpMessageBody::SessionReject { reason, error } => {
183                if let Some(err) = error {
184                    Err(format!("Session rejected: [{}] {}", err.code, err.message))
185                } else {
186                    Err(format!("Session rejected by remote: {}", reason))
187                }
188            }
189            other => Err(format!(
190                "Expected SESSION_ACCEPT/REJECT, got: {:?}",
191                std::mem::discriminant(other)
192            )),
193        }
194    }
195
196    /// Mark a session as used (touch timestamp, increment task count).
197    pub async fn touch(&self, url: &str) {
198        let mut sessions = self.sessions.write().await;
199        if let Some(session) = sessions.get_mut(url) {
200            session.touch();
201            session.task_count += 1;
202        }
203    }
204
205    /// Close a session.
206    pub async fn close(&self, url: &str) -> Result<(), String> {
207        let session = {
208            let mut sessions = self.sessions.write().await;
209            sessions.remove(url)
210        };
211
212        if let Some(session) = session {
213            let close_msg = LdpEnvelope::new(
214                &session.session_id,
215                &self.config.delegate_id,
216                &session.remote_delegate_id,
217                LdpMessageBody::SessionClose { reason: None },
218                session.payload.mode,
219            );
220            // Best-effort close — don't fail if remote is unreachable.
221            let _ = self.client.send_message(url, &close_msg).await;
222            info!(session_id = %session.session_id, "LDP session closed");
223        }
224
225        Ok(())
226    }
227
228    /// Close all sessions.
229    pub async fn close_all(&self) {
230        let urls: Vec<String> = {
231            let sessions = self.sessions.read().await;
232            sessions.keys().cloned().collect()
233        };
234        for url in urls {
235            let _ = self.close(&url).await;
236        }
237    }
238
239    /// Get the number of active sessions.
240    pub async fn active_count(&self) -> usize {
241        let sessions = self.sessions.read().await;
242        sessions.values().filter(|s| s.is_active()).count()
243    }
244}