1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt;
3use std::sync::{Arc, Mutex};
4
5use base64::Engine as _;
6use serde::{Deserialize, Serialize};
7use sha2::{Digest, Sha256};
8use time::OffsetDateTime;
9
10use crate::event_log::{AnyEventLog, EventId, EventLog, LogError, LogEvent, Topic};
11
12pub const SESSIONS_TOPIC: &str = "orchestrator.sessions";
13
14const SESSION_CREATED_KIND: &str = "session_created";
15const SESSION_TOUCHED_KIND: &str = "session_touched";
16const SESSION_EXPIRED_KIND: &str = "session_expired";
17const GENERATED_TOKEN_PREFIX: &str = "harn_sess_";
18const MIN_SESSION_TOKEN_CHARS: usize = 32;
19const TOKEN_RANDOM_BYTES: usize = 32;
20const TOKEN_HANDLE_PREFIX: &str = "sha256:v1:";
21
22pub type SessionAttributes = BTreeMap<String, serde_json::Value>;
23
24#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
25pub struct Session {
26 pub id: String,
27 pub principal: String,
28 #[serde(with = "time::serde::rfc3339")]
29 pub created_at: OffsetDateTime,
30 #[serde(with = "time::serde::rfc3339")]
31 pub last_seen_at: OffsetDateTime,
32 #[serde(with = "time::serde::rfc3339")]
33 pub expires_at: OffsetDateTime,
34 #[serde(default)]
35 pub attributes: SessionAttributes,
36}
37
38impl Session {
39 pub fn is_expired_at(&self, now: OffsetDateTime) -> bool {
40 self.expires_at <= now
41 }
42}
43
44#[derive(Clone, Debug, PartialEq)]
45pub struct CreateSession {
46 pub id: Option<String>,
47 pub principal: String,
48 pub created_at: Option<OffsetDateTime>,
49 pub expires_at: OffsetDateTime,
50 pub attributes: SessionAttributes,
51}
52
53impl CreateSession {
54 pub fn new(principal: impl Into<String>, expires_at: OffsetDateTime) -> Self {
55 Self {
56 id: None,
57 principal: principal.into(),
58 created_at: None,
59 expires_at,
60 attributes: BTreeMap::new(),
61 }
62 }
63}
64
65#[derive(Clone, Debug, PartialEq)]
66pub struct TouchSession {
67 pub id: String,
68 pub last_seen_at: OffsetDateTime,
69 pub expires_at: Option<OffsetDateTime>,
70 pub attributes: Option<SessionAttributes>,
71}
72
73impl TouchSession {
74 pub fn new(id: impl Into<String>, last_seen_at: OffsetDateTime) -> Self {
75 Self {
76 id: id.into(),
77 last_seen_at,
78 expires_at: None,
79 attributes: None,
80 }
81 }
82}
83
84#[derive(Clone, Debug, PartialEq)]
85pub struct ExpireSession {
86 pub id: String,
87 pub expired_at: OffsetDateTime,
88}
89
90impl ExpireSession {
91 pub fn new(id: impl Into<String>, expired_at: OffsetDateTime) -> Self {
92 Self {
93 id: id.into(),
94 expired_at,
95 }
96 }
97}
98
99#[derive(Clone)]
100pub struct SessionStore {
101 event_log: Arc<AnyEventLog>,
102 topic: Topic,
103 projection: Arc<Mutex<SessionProjection>>,
104}
105
106impl SessionStore {
107 pub fn new(event_log: Arc<AnyEventLog>) -> Self {
108 Self {
109 event_log,
110 topic: Topic::new(SESSIONS_TOPIC).expect("sessions topic is valid"),
111 projection: Arc::new(Mutex::new(SessionProjection::default())),
112 }
113 }
114
115 pub async fn create(&self, request: CreateSession) -> Result<Session, SessionError> {
116 let created_at = request.created_at.unwrap_or_else(OffsetDateTime::now_utc);
117 let id = match request.id {
118 Some(id) => validate_session_id(id)?,
119 None => generate_session_token(),
120 };
121 let token_handle = session_token_handle(&id)?;
122 let principal = validate_principal(request.principal)?;
123 if request.expires_at <= created_at {
124 return Err(SessionError::Invalid(
125 "expires_at must be later than created_at".to_string(),
126 ));
127 }
128
129 self.refresh().await?;
130 if self.has_seen_handle(&token_handle) {
131 return Err(SessionError::Duplicate(id));
132 }
133
134 let record = SessionRecord {
135 token_handle,
136 principal,
137 created_at,
138 last_seen_at: created_at,
139 expires_at: request.expires_at,
140 attributes: request.attributes,
141 };
142 let created_event_id = self
143 .append(
144 SESSION_CREATED_KIND,
145 serde_json::to_value(SessionCreatedPayload {
146 record: record.clone(),
147 })?,
148 )
149 .await?;
150 self.refresh().await?;
151 if !self.created_by_event(&record.token_handle, created_event_id) {
152 return Err(SessionError::Duplicate(id));
153 }
154 Ok(record.to_session(id))
155 }
156
157 pub async fn get(
158 &self,
159 id: &str,
160 now: OffsetDateTime,
161 ) -> Result<Option<Session>, SessionError> {
162 let id = validate_session_id(id)?;
163 let token_handle = session_token_handle(&id)?;
164 self.refresh().await?;
165 Ok(self.projected_session(&token_handle, id, now))
166 }
167
168 pub async fn touch(&self, request: TouchSession) -> Result<Option<Session>, SessionError> {
169 let id = validate_session_id(request.id)?;
170 let token_handle = session_token_handle(&id)?;
171 self.refresh().await?;
172 if self
173 .projected_session(&token_handle, id.clone(), request.last_seen_at)
174 .is_none()
175 {
176 return Ok(None);
177 }
178 if let Some(expires_at) = request.expires_at {
179 if expires_at <= request.last_seen_at {
180 return Err(SessionError::Invalid(
181 "expires_at must be later than last_seen_at".to_string(),
182 ));
183 }
184 }
185
186 self.append(
187 SESSION_TOUCHED_KIND,
188 serde_json::to_value(SessionTouchedPayload {
189 token_handle: token_handle.clone(),
190 last_seen_at: request.last_seen_at,
191 expires_at: request.expires_at,
192 attributes: request.attributes,
193 })?,
194 )
195 .await?;
196 self.refresh().await?;
197 Ok(self.projected_session(&token_handle, id, request.last_seen_at))
198 }
199
200 pub async fn expire(&self, request: ExpireSession) -> Result<Option<Session>, SessionError> {
201 let id = validate_session_id(request.id)?;
202 let token_handle = session_token_handle(&id)?;
203 self.refresh().await?;
204 if !self.has_seen_handle(&token_handle) {
205 return Ok(None);
206 }
207 self.append(
208 SESSION_EXPIRED_KIND,
209 serde_json::to_value(SessionExpiredPayload {
210 token_handle: token_handle.clone(),
211 expired_at: request.expired_at,
212 })?,
213 )
214 .await?;
215 self.refresh().await?;
216 Ok(self
217 .projection
218 .lock()
219 .expect("session projection poisoned")
220 .sessions
221 .get(&token_handle)
222 .map(|entry| entry.record.to_session(id)))
223 }
224
225 async fn append(
226 &self,
227 kind: &'static str,
228 payload: serde_json::Value,
229 ) -> Result<EventId, SessionError> {
230 self.event_log
231 .append(&self.topic, LogEvent::new(kind, payload))
232 .await
233 .map_err(SessionError::from)
234 }
235
236 async fn refresh(&self) -> Result<(), SessionError> {
237 let from = self
238 .projection
239 .lock()
240 .expect("session projection poisoned")
241 .last_event_id;
242 let events = self
243 .event_log
244 .read_range(&self.topic, from, usize::MAX)
245 .await?;
246 if events.is_empty() {
247 return Ok(());
248 }
249
250 let mut projection = self.projection.lock().expect("session projection poisoned");
251 for (event_id, event) in events {
252 if projection
253 .last_event_id
254 .is_some_and(|last_event_id| event_id <= last_event_id)
255 {
256 continue;
257 }
258 apply_event(&mut projection, event_id, event)?;
259 projection.last_event_id = Some(event_id);
260 }
261 Ok(())
262 }
263
264 fn has_seen_handle(&self, token_handle: &str) -> bool {
265 self.projection
266 .lock()
267 .expect("session projection poisoned")
268 .seen_handles
269 .contains(token_handle)
270 }
271
272 fn created_by_event(&self, token_handle: &str, event_id: EventId) -> bool {
273 self.projection
274 .lock()
275 .expect("session projection poisoned")
276 .sessions
277 .get(token_handle)
278 .is_some_and(|entry| entry.created_event_id == event_id)
279 }
280
281 fn projected_session(
282 &self,
283 token_handle: &str,
284 id: impl Into<String>,
285 now: OffsetDateTime,
286 ) -> Option<Session> {
287 let id = id.into();
288 self.projection
289 .lock()
290 .expect("session projection poisoned")
291 .sessions
292 .get(token_handle)
293 .filter(|entry| !entry.expired)
294 .map(|entry| &entry.record)
295 .filter(|record| !record.is_expired_at(now))
296 .map(|record| record.to_session(id))
297 }
298}
299
300pub async fn create(store: &SessionStore, request: CreateSession) -> Result<Session, SessionError> {
301 store.create(request).await
302}
303
304pub async fn get(
305 store: &SessionStore,
306 id: &str,
307 now: OffsetDateTime,
308) -> Result<Option<Session>, SessionError> {
309 store.get(id, now).await
310}
311
312pub async fn touch(
313 store: &SessionStore,
314 request: TouchSession,
315) -> Result<Option<Session>, SessionError> {
316 store.touch(request).await
317}
318
319pub async fn expire(
320 store: &SessionStore,
321 request: ExpireSession,
322) -> Result<Option<Session>, SessionError> {
323 store.expire(request).await
324}
325
326#[derive(Debug, Clone, PartialEq, Eq)]
327pub enum SessionError {
328 Duplicate(String),
329 Invalid(String),
330 Log(String),
331 Serde(String),
332}
333
334impl fmt::Display for SessionError {
335 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336 match self {
337 Self::Duplicate(id) => write!(f, "session '{id}' already exists"),
338 Self::Invalid(message) | Self::Log(message) | Self::Serde(message) => message.fmt(f),
339 }
340 }
341}
342
343impl std::error::Error for SessionError {}
344
345impl From<LogError> for SessionError {
346 fn from(value: LogError) -> Self {
347 Self::Log(value.to_string())
348 }
349}
350
351impl From<serde_json::Error> for SessionError {
352 fn from(value: serde_json::Error) -> Self {
353 Self::Serde(value.to_string())
354 }
355}
356
357#[derive(Default)]
358struct SessionProjection {
359 sessions: BTreeMap<String, ProjectedSession>,
360 seen_handles: BTreeSet<String>,
361 last_event_id: Option<EventId>,
362}
363
364struct ProjectedSession {
365 record: SessionRecord,
366 expired: bool,
367 created_event_id: EventId,
368}
369
370#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
371struct SessionRecord {
372 token_handle: String,
373 principal: String,
374 #[serde(with = "time::serde::rfc3339")]
375 created_at: OffsetDateTime,
376 #[serde(with = "time::serde::rfc3339")]
377 last_seen_at: OffsetDateTime,
378 #[serde(with = "time::serde::rfc3339")]
379 expires_at: OffsetDateTime,
380 #[serde(default)]
381 attributes: SessionAttributes,
382}
383
384impl SessionRecord {
385 fn is_expired_at(&self, now: OffsetDateTime) -> bool {
386 self.expires_at <= now
387 }
388
389 fn to_session(&self, id: impl Into<String>) -> Session {
390 Session {
391 id: id.into(),
392 principal: self.principal.clone(),
393 created_at: self.created_at,
394 last_seen_at: self.last_seen_at,
395 expires_at: self.expires_at,
396 attributes: self.attributes.clone(),
397 }
398 }
399}
400
401#[derive(Serialize, Deserialize)]
402struct SessionCreatedPayload {
403 record: SessionRecord,
404}
405
406#[derive(Serialize, Deserialize)]
407struct SessionTouchedPayload {
408 token_handle: String,
409 #[serde(with = "time::serde::rfc3339")]
410 last_seen_at: OffsetDateTime,
411 #[serde(default, with = "time::serde::rfc3339::option")]
412 expires_at: Option<OffsetDateTime>,
413 #[serde(default)]
414 attributes: Option<SessionAttributes>,
415}
416
417#[derive(Serialize, Deserialize)]
418struct SessionExpiredPayload {
419 token_handle: String,
420 #[serde(with = "time::serde::rfc3339")]
421 expired_at: OffsetDateTime,
422}
423
424fn apply_event(
425 projection: &mut SessionProjection,
426 event_id: EventId,
427 event: LogEvent,
428) -> Result<(), SessionError> {
429 match event.kind.as_str() {
430 SESSION_CREATED_KIND => {
431 let payload: SessionCreatedPayload = serde_json::from_value(event.payload)?;
432 projection
433 .seen_handles
434 .insert(payload.record.token_handle.clone());
435 projection
436 .sessions
437 .entry(payload.record.token_handle.clone())
438 .or_insert_with(|| ProjectedSession {
439 record: payload.record,
440 expired: false,
441 created_event_id: event_id,
442 });
443 }
444 SESSION_TOUCHED_KIND => {
445 let payload: SessionTouchedPayload = serde_json::from_value(event.payload)?;
446 if let Some(entry) = projection.sessions.get_mut(&payload.token_handle) {
447 if !entry.expired {
448 entry.record.last_seen_at = entry.record.last_seen_at.max(payload.last_seen_at);
449 if let Some(expires_at) = payload.expires_at {
450 entry.record.expires_at = expires_at;
451 }
452 if let Some(attributes) = payload.attributes {
453 entry.record.attributes = attributes;
454 }
455 }
456 }
457 }
458 SESSION_EXPIRED_KIND => {
459 let payload: SessionExpiredPayload = serde_json::from_value(event.payload)?;
460 projection.seen_handles.insert(payload.token_handle.clone());
461 if let Some(entry) = projection.sessions.get_mut(&payload.token_handle) {
462 entry.expired = true;
463 entry.record.expires_at = entry.record.expires_at.min(payload.expired_at);
464 }
465 }
466 _ => {}
467 }
468 Ok(())
469}
470
471fn validate_session_id(id: impl Into<String>) -> Result<String, SessionError> {
472 let id = id.into();
473 let trimmed = id.trim();
474 if trimmed.is_empty() {
475 return Err(SessionError::Invalid(
476 "session id cannot be empty".to_string(),
477 ));
478 }
479 if trimmed.chars().count() < MIN_SESSION_TOKEN_CHARS {
480 return Err(SessionError::Invalid(format!(
481 "session id must be at least {MIN_SESSION_TOKEN_CHARS} characters"
482 )));
483 }
484 Ok(trimmed.to_string())
485}
486
487fn validate_principal(principal: impl Into<String>) -> Result<String, SessionError> {
488 let principal = principal.into();
489 let trimmed = principal.trim();
490 if trimmed.is_empty() {
491 return Err(SessionError::Invalid(
492 "session principal cannot be empty".to_string(),
493 ));
494 }
495 Ok(trimmed.to_string())
496}
497
498fn generate_session_token() -> String {
499 let random: [u8; TOKEN_RANDOM_BYTES] = rand::random();
500 let token = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(random);
501 format!("{GENERATED_TOKEN_PREFIX}{token}")
502}
503
504fn session_token_handle(id: &str) -> Result<String, SessionError> {
505 let id = validate_session_id(id)?;
506 let mut hasher = Sha256::new();
507 hasher.update(b"harn.orchestrator.session-token.v1\0");
508 hasher.update(id.as_bytes());
509 Ok(format!(
510 "{TOKEN_HANDLE_PREFIX}{}",
511 hex::encode(hasher.finalize())
512 ))
513}
514
515#[cfg(test)]
516mod tests {
517 use std::sync::Arc;
518
519 use tempfile::tempdir;
520 use time::format_description::well_known::Rfc3339;
521
522 use super::*;
523 use crate::event_log::{FileEventLog, MemoryEventLog};
524
525 const SESSION_ID: &str = "harn_sess_test_abcdefghijklmnopqrstuvwxyz0123456789";
526
527 fn at(raw: &str) -> OffsetDateTime {
528 OffsetDateTime::parse(raw, &Rfc3339).expect("valid rfc3339 timestamp")
529 }
530
531 fn memory_log() -> Arc<AnyEventLog> {
532 Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)))
533 }
534
535 fn memory_store() -> SessionStore {
536 SessionStore::new(memory_log())
537 }
538
539 #[tokio::test]
540 async fn lifecycle_create_touch_expire_round_trip() {
541 let store = memory_store();
542 let created_at = at("2026-05-02T12:00:00Z");
543 let touched_at = at("2026-05-02T12:30:00Z");
544 let expires_at = at("2026-05-02T13:00:00Z");
545 let mut attributes = BTreeMap::new();
546 attributes.insert("role".to_string(), serde_json::json!("operator"));
547
548 let created = store
549 .create(CreateSession {
550 id: Some(SESSION_ID.to_string()),
551 principal: "user-1".to_string(),
552 created_at: Some(created_at),
553 expires_at,
554 attributes: attributes.clone(),
555 })
556 .await
557 .expect("create session");
558
559 assert_eq!(created.id, SESSION_ID);
560 assert_eq!(created.last_seen_at, created_at);
561 assert_eq!(created.attributes, attributes);
562 assert_eq!(
563 store
564 .get(SESSION_ID, created_at)
565 .await
566 .expect("get session")
567 .expect("session exists"),
568 created
569 );
570
571 let touched = store
572 .touch(TouchSession::new(SESSION_ID, touched_at))
573 .await
574 .expect("touch session")
575 .expect("active session");
576 assert_eq!(touched.last_seen_at, touched_at);
577 assert_eq!(touched.expires_at, expires_at);
578
579 let expired = store
580 .expire(ExpireSession::new(SESSION_ID, touched_at))
581 .await
582 .expect("expire session")
583 .expect("known session");
584 assert!(expired.is_expired_at(touched_at));
585 assert!(store
586 .get(SESSION_ID, touched_at)
587 .await
588 .expect("get expired session")
589 .is_none());
590 }
591
592 #[tokio::test]
593 async fn store_projection_replays_persisted_event_log() {
594 let dir = tempdir().expect("tempdir");
595 {
596 let event_log = Arc::new(AnyEventLog::File(
597 FileEventLog::open(dir.path().join("events"), 32).expect("open event log"),
598 ));
599 let first = SessionStore::new(event_log.clone());
600 first
601 .create(CreateSession {
602 id: Some(SESSION_ID.to_string()),
603 principal: "user-1".to_string(),
604 created_at: Some(at("2026-05-02T12:00:00Z")),
605 expires_at: at("2026-05-02T13:00:00Z"),
606 attributes: BTreeMap::new(),
607 })
608 .await
609 .expect("create session");
610 }
611
612 let reopened_log = Arc::new(AnyEventLog::File(
613 FileEventLog::open(dir.path().join("events"), 32).expect("reopen event log"),
614 ));
615 let restored = SessionStore::new(reopened_log);
616 let session = restored
617 .get(SESSION_ID, at("2026-05-02T12:30:00Z"))
618 .await
619 .expect("get restored session")
620 .expect("session restored");
621 assert_eq!(session.principal, "user-1");
622 }
623
624 #[tokio::test]
625 async fn event_log_payloads_do_not_persist_raw_bearer_tokens() {
626 let event_log = memory_log();
627 let store = SessionStore::new(event_log.clone());
628 store
629 .create(CreateSession {
630 id: Some(SESSION_ID.to_string()),
631 principal: "user-1".to_string(),
632 created_at: Some(at("2026-05-02T12:00:00Z")),
633 expires_at: at("2026-05-02T13:00:00Z"),
634 attributes: BTreeMap::new(),
635 })
636 .await
637 .expect("create session");
638 store
639 .touch(TouchSession::new(SESSION_ID, at("2026-05-02T12:30:00Z")))
640 .await
641 .expect("touch session");
642 store
643 .expire(ExpireSession::new(SESSION_ID, at("2026-05-02T12:45:00Z")))
644 .await
645 .expect("expire session");
646
647 let events = event_log
648 .read_range(
649 &Topic::new(SESSIONS_TOPIC).expect("sessions topic"),
650 None,
651 usize::MAX,
652 )
653 .await
654 .expect("read session log");
655 let payloads = events
656 .into_iter()
657 .map(|(_, event)| event.payload.to_string())
658 .collect::<Vec<_>>()
659 .join("\n");
660 assert!(!payloads.contains(SESSION_ID));
661 assert!(payloads.contains(TOKEN_HANDLE_PREFIX));
662 }
663
664 #[tokio::test]
665 async fn expired_sessions_cannot_be_touched() {
666 let store = memory_store();
667 store
668 .create(CreateSession {
669 id: Some(SESSION_ID.to_string()),
670 principal: "user-1".to_string(),
671 created_at: Some(at("2026-05-02T12:00:00Z")),
672 expires_at: at("2026-05-02T12:05:00Z"),
673 attributes: BTreeMap::new(),
674 })
675 .await
676 .expect("create session");
677
678 assert!(store
679 .touch(TouchSession::new(SESSION_ID, at("2026-05-02T12:06:00Z")))
680 .await
681 .expect("touch expired session")
682 .is_none());
683 }
684
685 #[tokio::test]
686 async fn duplicate_session_ids_are_rejected() {
687 let store = memory_store();
688 let request = CreateSession {
689 id: Some(SESSION_ID.to_string()),
690 principal: "user-1".to_string(),
691 created_at: Some(at("2026-05-02T12:00:00Z")),
692 expires_at: at("2026-05-02T13:00:00Z"),
693 attributes: BTreeMap::new(),
694 };
695 store.create(request.clone()).await.expect("first create");
696
697 let err = store.create(request).await.expect_err("duplicate rejected");
698 assert_eq!(err, SessionError::Duplicate(SESSION_ID.to_string()));
699 }
700
701 #[tokio::test]
702 async fn concurrent_duplicate_creates_return_one_winner() {
703 let event_log = memory_log();
704 let first = SessionStore::new(event_log.clone());
705 let second = SessionStore::new(event_log);
706 let request = CreateSession {
707 id: Some(SESSION_ID.to_string()),
708 principal: "user-1".to_string(),
709 created_at: Some(at("2026-05-02T12:00:00Z")),
710 expires_at: at("2026-05-02T13:00:00Z"),
711 attributes: BTreeMap::new(),
712 };
713
714 let (left, right) = tokio::join!(first.create(request.clone()), second.create(request));
715 let results = [left, right];
716 assert_eq!(results.iter().filter(|result| result.is_ok()).count(), 1);
717 assert_eq!(
718 results
719 .iter()
720 .filter(
721 |result| matches!(result, Err(SessionError::Duplicate(id)) if id == SESSION_ID)
722 )
723 .count(),
724 1
725 );
726 }
727
728 #[tokio::test]
729 async fn generated_session_ids_are_high_entropy_tokens() {
730 let store = memory_store();
731 let session = store
732 .create(CreateSession {
733 id: None,
734 principal: "user-1".to_string(),
735 created_at: Some(at("2026-05-02T12:00:00Z")),
736 expires_at: at("2026-05-02T13:00:00Z"),
737 attributes: BTreeMap::new(),
738 })
739 .await
740 .expect("create generated session");
741
742 assert!(session.id.starts_with(GENERATED_TOKEN_PREFIX));
743 assert!(session.id.len() >= MIN_SESSION_TOKEN_CHARS);
744 }
745}