ldp_protocol/
session_manager.rs1use crate::client::LdpClient;
13use crate::config::LdpAdapterConfig;
14use crate::types::messages::{LdpEnvelope, LdpMessageBody};
15use crate::types::payload::{negotiate_payload_mode, PayloadMode};
16use crate::types::session::{LdpSession, SessionState};
17use chrono::Utc;
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::RwLock;
21use tracing::{debug, info};
22
23#[derive(Clone)]
27pub struct SessionManager {
28 sessions: Arc<RwLock<HashMap<String, LdpSession>>>,
30 client: LdpClient,
32 config: LdpAdapterConfig,
34}
35
36impl SessionManager {
37 pub fn new(client: LdpClient, config: LdpAdapterConfig) -> Self {
39 Self {
40 sessions: Arc::new(RwLock::new(HashMap::new())),
41 client,
42 config,
43 }
44 }
45
46 pub async fn get_or_establish(&self, url: &str) -> Result<LdpSession, String> {
51 {
53 let sessions = self.sessions.read().await;
54 if let Some(session) = sessions.get(url) {
55 if session.is_active() {
56 debug!(url = %url, session_id = %session.session_id, "Reusing existing LDP session");
57 return Ok(session.clone());
58 }
59 debug!(url = %url, "Existing session expired or inactive, establishing new one");
60 }
61 }
62
63 let session = self.establish_session(url).await?;
65
66 {
68 let mut sessions = self.sessions.write().await;
69 sessions.insert(url.to_string(), session.clone());
70 }
71
72 Ok(session)
73 }
74
75 async fn establish_session(&self, url: &str) -> Result<LdpSession, String> {
78 let session_config = &self.config.session;
79 let our_delegate_id = &self.config.delegate_id;
80
81 info!(url = %url, "Establishing new LDP session");
82
83 let mut hello = LdpEnvelope::new(
85 "", our_delegate_id,
87 url,
88 LdpMessageBody::Hello {
89 delegate_id: our_delegate_id.clone(),
90 supported_modes: session_config.preferred_payload_modes.clone(),
91 },
92 PayloadMode::Text,
93 );
94
95 if let Some(ref secret) = self.config.signing_secret {
97 crate::signing::apply_signature(&mut hello, secret);
98 }
99
100 let hello_response = self.client.send_message(url, &hello).await?;
101
102 let remote_modes = match &hello_response.body {
104 LdpMessageBody::CapabilityManifest { capabilities } => {
105 capabilities
107 .get("supported_modes")
108 .and_then(|v| serde_json::from_value::<Vec<PayloadMode>>(v.clone()).ok())
109 .unwrap_or_else(|| vec![PayloadMode::Text])
110 }
111 other => {
112 return Err(format!(
113 "Expected CAPABILITY_MANIFEST response to HELLO, got: {:?}",
114 std::mem::discriminant(other)
115 ));
116 }
117 };
118
119 let remote_delegate_id = hello_response.from.clone();
120
121 let negotiated =
123 negotiate_payload_mode(&session_config.preferred_payload_modes, &remote_modes);
124
125 debug!(
126 mode = %negotiated.mode,
127 fallbacks = ?negotiated.fallback_chain,
128 "Payload mode negotiated"
129 );
130
131 let session_id = uuid::Uuid::new_v4().to_string();
133 let mut propose = LdpEnvelope::new(
134 &session_id,
135 our_delegate_id,
136 &remote_delegate_id,
137 LdpMessageBody::SessionPropose {
138 config: serde_json::json!({
139 "payload_mode": negotiated.mode,
140 "ttl_secs": session_config.ttl_secs,
141 "trust_domain": self.config.trust_domain.name,
142 }),
143 },
144 PayloadMode::Text,
145 );
146
147 if let Some(ref secret) = self.config.signing_secret {
149 crate::signing::apply_signature(&mut propose, secret);
150 }
151
152 let propose_response = self.client.send_message(url, &propose).await?;
153
154 match &propose_response.body {
156 LdpMessageBody::SessionAccept {
157 session_id: accepted_id,
158 negotiated_mode,
159 } => {
160 info!(
161 session_id = %accepted_id,
162 mode = %negotiated_mode,
163 "LDP session established"
164 );
165
166 let now = Utc::now();
167 Ok(LdpSession {
168 session_id: accepted_id.clone(),
169 remote_url: url.to_string(),
170 remote_delegate_id,
171 state: SessionState::Active,
172 payload: negotiated,
173 trust_domain: self.config.trust_domain.clone(),
174 created_at: now,
175 last_used: now,
176 ttl_secs: session_config.ttl_secs,
177 task_count: 0,
178 })
179 }
180 LdpMessageBody::SessionReject { reason, error } => {
181 if let Some(err) = error {
182 Err(format!("Session rejected: [{}] {}", err.code, err.message))
183 } else {
184 Err(format!("Session rejected by remote: {}", reason))
185 }
186 }
187 other => Err(format!(
188 "Expected SESSION_ACCEPT/REJECT, got: {:?}",
189 std::mem::discriminant(other)
190 )),
191 }
192 }
193
194 pub async fn touch(&self, url: &str) {
196 let mut sessions = self.sessions.write().await;
197 if let Some(session) = sessions.get_mut(url) {
198 session.touch();
199 session.task_count += 1;
200 }
201 }
202
203 pub async fn close(&self, url: &str) -> Result<(), String> {
205 let session = {
206 let mut sessions = self.sessions.write().await;
207 sessions.remove(url)
208 };
209
210 if let Some(session) = session {
211 let close_msg = LdpEnvelope::new(
212 &session.session_id,
213 &self.config.delegate_id,
214 &session.remote_delegate_id,
215 LdpMessageBody::SessionClose { reason: None },
216 session.payload.mode,
217 );
218 let _ = self.client.send_message(url, &close_msg).await;
220 info!(session_id = %session.session_id, "LDP session closed");
221 }
222
223 Ok(())
224 }
225
226 pub async fn close_all(&self) {
228 let urls: Vec<String> = {
229 let sessions = self.sessions.read().await;
230 sessions.keys().cloned().collect()
231 };
232 for url in urls {
233 let _ = self.close(&url).await;
234 }
235 }
236
237 pub async fn active_count(&self) -> usize {
239 let sessions = self.sessions.read().await;
240 sessions.values().filter(|s| s.is_active()).count()
241 }
242}