Skip to main content

ldp_protocol/
session_manager.rs

1//! LDP session cache and lifecycle management.
2//!
3//! The session manager is the key architectural component that makes LDP
4//! sessions transparent to JamJet's workflow engine. From the outside,
5//! `invoke()` is request→response. Internally, the session manager handles:
6//!
7//! 1. Check if a session exists for (url, config) pair
8//! 2. If not, run HELLO → CAPABILITY_MANIFEST → SESSION_PROPOSE → SESSION_ACCEPT
9//! 3. Cache the session
10//! 4. Return the active session for task submission
11
12use crate::client::LdpClient;
13use crate::config::LdpAdapterConfig;
14use crate::types::messages::{LdpEnvelope, LdpMessageBody};
15use crate::types::payload::{negotiate_payload_mode, PayloadMode};
16use crate::types::session::{LdpSession, SessionState};
17use chrono::Utc;
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::RwLock;
21use tracing::{debug, info};
22
23/// Manages LDP session lifecycle and caching.
24///
25/// Thread-safe: uses `RwLock` internally.
26#[derive(Clone)]
27pub struct SessionManager {
28    /// Active sessions keyed by remote URL.
29    sessions: Arc<RwLock<HashMap<String, LdpSession>>>,
30    /// LDP HTTP client for protocol messages.
31    client: LdpClient,
32    /// Adapter configuration.
33    config: LdpAdapterConfig,
34}
35
36impl SessionManager {
37    /// Create a new session manager.
38    pub fn new(client: LdpClient, config: LdpAdapterConfig) -> Self {
39        Self {
40            sessions: Arc::new(RwLock::new(HashMap::new())),
41            client,
42            config,
43        }
44    }
45
46    /// Get or establish a session for the given remote URL.
47    ///
48    /// If an active, non-expired session exists, returns it.
49    /// Otherwise, runs the full handshake sequence.
50    pub async fn get_or_establish(&self, url: &str) -> Result<LdpSession, String> {
51        // Check for existing active session.
52        {
53            let sessions = self.sessions.read().await;
54            if let Some(session) = sessions.get(url) {
55                if session.is_active() {
56                    debug!(url = %url, session_id = %session.session_id, "Reusing existing LDP session");
57                    return Ok(session.clone());
58                }
59                debug!(url = %url, "Existing session expired or inactive, establishing new one");
60            }
61        }
62
63        // Establish a new session.
64        let session = self.establish_session(url).await?;
65
66        // Cache it.
67        {
68            let mut sessions = self.sessions.write().await;
69            sessions.insert(url.to_string(), session.clone());
70        }
71
72        Ok(session)
73    }
74
75    /// Run the full LDP session establishment handshake:
76    /// HELLO → CAPABILITY_MANIFEST → SESSION_PROPOSE → SESSION_ACCEPT
77    async fn establish_session(&self, url: &str) -> Result<LdpSession, String> {
78        let session_config = &self.config.session;
79        let our_delegate_id = &self.config.delegate_id;
80
81        info!(url = %url, "Establishing new LDP session");
82
83        // Step 1: Send HELLO
84        let mut hello = LdpEnvelope::new(
85            "", // No session yet
86            our_delegate_id,
87            url,
88            LdpMessageBody::Hello {
89                delegate_id: our_delegate_id.clone(),
90                supported_modes: session_config.preferred_payload_modes.clone(),
91            },
92            PayloadMode::Text,
93        );
94
95        // Sign if configured
96        if let Some(ref secret) = self.config.signing_secret {
97            crate::signing::apply_signature(&mut hello, secret);
98        }
99
100        let hello_response = self.client.send_message(url, &hello).await?;
101
102        // Step 2: Parse CAPABILITY_MANIFEST response
103        let remote_modes = match &hello_response.body {
104            LdpMessageBody::CapabilityManifest { capabilities } => {
105                // Extract supported modes from capability manifest.
106                capabilities
107                    .get("supported_modes")
108                    .and_then(|v| serde_json::from_value::<Vec<PayloadMode>>(v.clone()).ok())
109                    .unwrap_or_else(|| vec![PayloadMode::Text])
110            }
111            other => {
112                return Err(format!(
113                    "Expected CAPABILITY_MANIFEST response to HELLO, got: {:?}",
114                    std::mem::discriminant(other)
115                ));
116            }
117        };
118
119        let remote_delegate_id = hello_response.from.clone();
120
121        // Step 3: Negotiate payload mode
122        let negotiated =
123            negotiate_payload_mode(&session_config.preferred_payload_modes, &remote_modes);
124
125        debug!(
126            mode = %negotiated.mode,
127            fallbacks = ?negotiated.fallback_chain,
128            "Payload mode negotiated"
129        );
130
131        // Step 4: Send SESSION_PROPOSE (trust is enforced in discover())
132        let session_id = uuid::Uuid::new_v4().to_string();
133        let mut propose = LdpEnvelope::new(
134            &session_id,
135            our_delegate_id,
136            &remote_delegate_id,
137            LdpMessageBody::SessionPropose {
138                config: serde_json::json!({
139                    "payload_mode": negotiated.mode,
140                    "ttl_secs": session_config.ttl_secs,
141                    "trust_domain": self.config.trust_domain.name,
142                }),
143            },
144            PayloadMode::Text,
145        );
146
147        // Sign if configured
148        if let Some(ref secret) = self.config.signing_secret {
149            crate::signing::apply_signature(&mut propose, secret);
150        }
151
152        let propose_response = self.client.send_message(url, &propose).await?;
153
154        // Step 6: Handle SESSION_ACCEPT or SESSION_REJECT
155        match &propose_response.body {
156            LdpMessageBody::SessionAccept {
157                session_id: accepted_id,
158                negotiated_mode,
159            } => {
160                info!(
161                    session_id = %accepted_id,
162                    mode = %negotiated_mode,
163                    "LDP session established"
164                );
165
166                let now = Utc::now();
167                Ok(LdpSession {
168                    session_id: accepted_id.clone(),
169                    remote_url: url.to_string(),
170                    remote_delegate_id,
171                    state: SessionState::Active,
172                    payload: negotiated,
173                    trust_domain: self.config.trust_domain.clone(),
174                    created_at: now,
175                    last_used: now,
176                    ttl_secs: session_config.ttl_secs,
177                    task_count: 0,
178                })
179            }
180            LdpMessageBody::SessionReject { reason, error } => {
181                if let Some(err) = error {
182                    Err(format!("Session rejected: [{}] {}", err.code, err.message))
183                } else {
184                    Err(format!("Session rejected by remote: {}", reason))
185                }
186            }
187            other => Err(format!(
188                "Expected SESSION_ACCEPT/REJECT, got: {:?}",
189                std::mem::discriminant(other)
190            )),
191        }
192    }
193
194    /// Mark a session as used (touch timestamp, increment task count).
195    pub async fn touch(&self, url: &str) {
196        let mut sessions = self.sessions.write().await;
197        if let Some(session) = sessions.get_mut(url) {
198            session.touch();
199            session.task_count += 1;
200        }
201    }
202
203    /// Close a session.
204    pub async fn close(&self, url: &str) -> Result<(), String> {
205        let session = {
206            let mut sessions = self.sessions.write().await;
207            sessions.remove(url)
208        };
209
210        if let Some(session) = session {
211            let close_msg = LdpEnvelope::new(
212                &session.session_id,
213                &self.config.delegate_id,
214                &session.remote_delegate_id,
215                LdpMessageBody::SessionClose { reason: None },
216                session.payload.mode,
217            );
218            // Best-effort close — don't fail if remote is unreachable.
219            let _ = self.client.send_message(url, &close_msg).await;
220            info!(session_id = %session.session_id, "LDP session closed");
221        }
222
223        Ok(())
224    }
225
226    /// Close all sessions.
227    pub async fn close_all(&self) {
228        let urls: Vec<String> = {
229            let sessions = self.sessions.read().await;
230            sessions.keys().cloned().collect()
231        };
232        for url in urls {
233            let _ = self.close(&url).await;
234        }
235    }
236
237    /// Get the number of active sessions.
238    pub async fn active_count(&self) -> usize {
239        let sessions = self.sessions.read().await;
240        sessions.values().filter(|s| s.is_active()).count()
241    }
242}