ldp_protocol/
session_manager.rs1use crate::client::LdpClient;
13use crate::config::LdpAdapterConfig;
14use crate::types::messages::{LdpEnvelope, LdpMessageBody};
15use crate::types::payload::{negotiate_payload_mode, PayloadMode};
16use crate::types::session::{LdpSession, SessionState};
17use chrono::Utc;
18use std::collections::HashMap;
19use std::sync::Arc;
20use tokio::sync::RwLock;
21use tracing::{debug, info};
22
23#[derive(Clone)]
27pub struct SessionManager {
28 sessions: Arc<RwLock<HashMap<String, LdpSession>>>,
30 client: LdpClient,
32 config: LdpAdapterConfig,
34}
35
36impl SessionManager {
37 pub fn new(client: LdpClient, config: LdpAdapterConfig) -> Self {
39 Self {
40 sessions: Arc::new(RwLock::new(HashMap::new())),
41 client,
42 config,
43 }
44 }
45
46 pub async fn get_or_establish(&self, url: &str) -> Result<LdpSession, String> {
51 {
53 let sessions = self.sessions.read().await;
54 if let Some(session) = sessions.get(url) {
55 if session.is_active() {
56 debug!(url = %url, session_id = %session.session_id, "Reusing existing LDP session");
57 return Ok(session.clone());
58 }
59 debug!(url = %url, "Existing session expired or inactive, establishing new one");
60 }
61 }
62
63 let session = self.establish_session(url).await?;
65
66 {
68 let mut sessions = self.sessions.write().await;
69 sessions.insert(url.to_string(), session.clone());
70 }
71
72 Ok(session)
73 }
74
75 async fn establish_session(&self, url: &str) -> Result<LdpSession, String> {
78 let session_config = &self.config.session;
79 let our_delegate_id = &self.config.delegate_id;
80
81 info!(url = %url, "Establishing new LDP session");
82
83 let mut hello = LdpEnvelope::new(
85 "", our_delegate_id,
87 url,
88 LdpMessageBody::Hello {
89 delegate_id: our_delegate_id.clone(),
90 supported_modes: session_config.preferred_payload_modes.clone(),
91 },
92 PayloadMode::Text,
93 );
94
95 if let Some(ref secret) = self.config.signing_secret {
97 crate::signing::apply_signature(&mut hello, secret);
98 }
99
100 let hello_response = self.client.send_message(url, &hello).await?;
101
102 let remote_modes = match &hello_response.body {
104 LdpMessageBody::CapabilityManifest { capabilities } => {
105 capabilities
107 .get("supported_modes")
108 .and_then(|v| serde_json::from_value::<Vec<PayloadMode>>(v.clone()).ok())
109 .unwrap_or_else(|| vec![PayloadMode::Text])
110 }
111 other => {
112 return Err(format!(
113 "Expected CAPABILITY_MANIFEST response to HELLO, got: {:?}",
114 std::mem::discriminant(other)
115 ));
116 }
117 };
118
119 let remote_delegate_id = hello_response.from.clone();
120
121 let negotiated = negotiate_payload_mode(
123 &session_config.preferred_payload_modes,
124 &remote_modes,
125 );
126
127 debug!(
128 mode = %negotiated.mode,
129 fallbacks = ?negotiated.fallback_chain,
130 "Payload mode negotiated"
131 );
132
133 let session_id = uuid::Uuid::new_v4().to_string();
135 let mut propose = LdpEnvelope::new(
136 &session_id,
137 our_delegate_id,
138 &remote_delegate_id,
139 LdpMessageBody::SessionPropose {
140 config: serde_json::json!({
141 "payload_mode": negotiated.mode,
142 "ttl_secs": session_config.ttl_secs,
143 "trust_domain": self.config.trust_domain.name,
144 }),
145 },
146 PayloadMode::Text,
147 );
148
149 if let Some(ref secret) = self.config.signing_secret {
151 crate::signing::apply_signature(&mut propose, secret);
152 }
153
154 let propose_response = self.client.send_message(url, &propose).await?;
155
156 match &propose_response.body {
158 LdpMessageBody::SessionAccept {
159 session_id: accepted_id,
160 negotiated_mode,
161 } => {
162 info!(
163 session_id = %accepted_id,
164 mode = %negotiated_mode,
165 "LDP session established"
166 );
167
168 let now = Utc::now();
169 Ok(LdpSession {
170 session_id: accepted_id.clone(),
171 remote_url: url.to_string(),
172 remote_delegate_id,
173 state: SessionState::Active,
174 payload: negotiated,
175 trust_domain: self.config.trust_domain.clone(),
176 created_at: now,
177 last_used: now,
178 ttl_secs: session_config.ttl_secs,
179 task_count: 0,
180 })
181 }
182 LdpMessageBody::SessionReject { reason, error } => {
183 if let Some(err) = error {
184 Err(format!("Session rejected: [{}] {}", err.code, err.message))
185 } else {
186 Err(format!("Session rejected by remote: {}", reason))
187 }
188 }
189 other => Err(format!(
190 "Expected SESSION_ACCEPT/REJECT, got: {:?}",
191 std::mem::discriminant(other)
192 )),
193 }
194 }
195
196 pub async fn touch(&self, url: &str) {
198 let mut sessions = self.sessions.write().await;
199 if let Some(session) = sessions.get_mut(url) {
200 session.touch();
201 session.task_count += 1;
202 }
203 }
204
205 pub async fn close(&self, url: &str) -> Result<(), String> {
207 let session = {
208 let mut sessions = self.sessions.write().await;
209 sessions.remove(url)
210 };
211
212 if let Some(session) = session {
213 let close_msg = LdpEnvelope::new(
214 &session.session_id,
215 &self.config.delegate_id,
216 &session.remote_delegate_id,
217 LdpMessageBody::SessionClose { reason: None },
218 session.payload.mode,
219 );
220 let _ = self.client.send_message(url, &close_msg).await;
222 info!(session_id = %session.session_id, "LDP session closed");
223 }
224
225 Ok(())
226 }
227
228 pub async fn close_all(&self) {
230 let urls: Vec<String> = {
231 let sessions = self.sessions.read().await;
232 sessions.keys().cloned().collect()
233 };
234 for url in urls {
235 let _ = self.close(&url).await;
236 }
237 }
238
239 pub async fn active_count(&self) -> usize {
241 let sessions = self.sessions.read().await;
242 sessions.values().filter(|s| s.is_active()).count()
243 }
244}