Skip to main content

harn_vm/sessions/
mod.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt;
3use std::sync::{Arc, Mutex};
4
5use base64::Engine as _;
6use serde::{Deserialize, Serialize};
7use sha2::{Digest, Sha256};
8use time::OffsetDateTime;
9
10use crate::event_log::{AnyEventLog, EventId, EventLog, LogError, LogEvent, Topic};
11
12pub const SESSIONS_TOPIC: &str = "orchestrator.sessions";
13
14const SESSION_CREATED_KIND: &str = "session_created";
15const SESSION_TOUCHED_KIND: &str = "session_touched";
16const SESSION_EXPIRED_KIND: &str = "session_expired";
17const GENERATED_TOKEN_PREFIX: &str = "harn_sess_";
18const MIN_SESSION_TOKEN_CHARS: usize = 32;
19const TOKEN_RANDOM_BYTES: usize = 32;
20const TOKEN_HANDLE_PREFIX: &str = "sha256:v1:";
21
22pub type SessionAttributes = BTreeMap<String, serde_json::Value>;
23
24#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
25pub struct Session {
26    pub id: String,
27    pub principal: String,
28    #[serde(with = "time::serde::rfc3339")]
29    pub created_at: OffsetDateTime,
30    #[serde(with = "time::serde::rfc3339")]
31    pub last_seen_at: OffsetDateTime,
32    #[serde(with = "time::serde::rfc3339")]
33    pub expires_at: OffsetDateTime,
34    #[serde(default)]
35    pub attributes: SessionAttributes,
36}
37
38impl Session {
39    pub fn is_expired_at(&self, now: OffsetDateTime) -> bool {
40        self.expires_at <= now
41    }
42}
43
44#[derive(Clone, Debug, PartialEq)]
45pub struct CreateSession {
46    pub id: Option<String>,
47    pub principal: String,
48    pub created_at: Option<OffsetDateTime>,
49    pub expires_at: OffsetDateTime,
50    pub attributes: SessionAttributes,
51}
52
53impl CreateSession {
54    pub fn new(principal: impl Into<String>, expires_at: OffsetDateTime) -> Self {
55        Self {
56            id: None,
57            principal: principal.into(),
58            created_at: None,
59            expires_at,
60            attributes: BTreeMap::new(),
61        }
62    }
63}
64
65#[derive(Clone, Debug, PartialEq)]
66pub struct TouchSession {
67    pub id: String,
68    pub last_seen_at: OffsetDateTime,
69    pub expires_at: Option<OffsetDateTime>,
70    pub attributes: Option<SessionAttributes>,
71}
72
73impl TouchSession {
74    pub fn new(id: impl Into<String>, last_seen_at: OffsetDateTime) -> Self {
75        Self {
76            id: id.into(),
77            last_seen_at,
78            expires_at: None,
79            attributes: None,
80        }
81    }
82}
83
84#[derive(Clone, Debug, PartialEq)]
85pub struct ExpireSession {
86    pub id: String,
87    pub expired_at: OffsetDateTime,
88}
89
90impl ExpireSession {
91    pub fn new(id: impl Into<String>, expired_at: OffsetDateTime) -> Self {
92        Self {
93            id: id.into(),
94            expired_at,
95        }
96    }
97}
98
99#[derive(Clone)]
100pub struct SessionStore {
101    event_log: Arc<AnyEventLog>,
102    topic: Topic,
103    projection: Arc<Mutex<SessionProjection>>,
104}
105
106impl SessionStore {
107    pub fn new(event_log: Arc<AnyEventLog>) -> Self {
108        Self {
109            event_log,
110            topic: Topic::new(SESSIONS_TOPIC).expect("sessions topic is valid"),
111            projection: Arc::new(Mutex::new(SessionProjection::default())),
112        }
113    }
114
115    pub async fn create(&self, request: CreateSession) -> Result<Session, SessionError> {
116        let created_at = request.created_at.unwrap_or_else(OffsetDateTime::now_utc);
117        let id = match request.id {
118            Some(id) => validate_session_id(id)?,
119            None => generate_session_token(),
120        };
121        let token_handle = session_token_handle(&id)?;
122        let principal = validate_principal(request.principal)?;
123        if request.expires_at <= created_at {
124            return Err(SessionError::Invalid(
125                "expires_at must be later than created_at".to_string(),
126            ));
127        }
128
129        self.refresh().await?;
130        if self.has_seen_handle(&token_handle) {
131            return Err(SessionError::Duplicate(id));
132        }
133
134        let record = SessionRecord {
135            token_handle,
136            principal,
137            created_at,
138            last_seen_at: created_at,
139            expires_at: request.expires_at,
140            attributes: request.attributes,
141        };
142        let created_event_id = self
143            .append(
144                SESSION_CREATED_KIND,
145                serde_json::to_value(SessionCreatedPayload {
146                    record: record.clone(),
147                })?,
148            )
149            .await?;
150        self.refresh().await?;
151        if !self.created_by_event(&record.token_handle, created_event_id) {
152            return Err(SessionError::Duplicate(id));
153        }
154        Ok(record.to_session(id))
155    }
156
157    pub async fn get(
158        &self,
159        id: &str,
160        now: OffsetDateTime,
161    ) -> Result<Option<Session>, SessionError> {
162        let id = validate_session_id(id)?;
163        let token_handle = session_token_handle(&id)?;
164        self.refresh().await?;
165        Ok(self.projected_session(&token_handle, id, now))
166    }
167
168    pub async fn touch(&self, request: TouchSession) -> Result<Option<Session>, SessionError> {
169        let id = validate_session_id(request.id)?;
170        let token_handle = session_token_handle(&id)?;
171        self.refresh().await?;
172        if self
173            .projected_session(&token_handle, id.clone(), request.last_seen_at)
174            .is_none()
175        {
176            return Ok(None);
177        }
178        if let Some(expires_at) = request.expires_at {
179            if expires_at <= request.last_seen_at {
180                return Err(SessionError::Invalid(
181                    "expires_at must be later than last_seen_at".to_string(),
182                ));
183            }
184        }
185
186        self.append(
187            SESSION_TOUCHED_KIND,
188            serde_json::to_value(SessionTouchedPayload {
189                token_handle: token_handle.clone(),
190                last_seen_at: request.last_seen_at,
191                expires_at: request.expires_at,
192                attributes: request.attributes,
193            })?,
194        )
195        .await?;
196        self.refresh().await?;
197        Ok(self.projected_session(&token_handle, id, request.last_seen_at))
198    }
199
200    pub async fn expire(&self, request: ExpireSession) -> Result<Option<Session>, SessionError> {
201        let id = validate_session_id(request.id)?;
202        let token_handle = session_token_handle(&id)?;
203        self.refresh().await?;
204        if !self.has_seen_handle(&token_handle) {
205            return Ok(None);
206        }
207        self.append(
208            SESSION_EXPIRED_KIND,
209            serde_json::to_value(SessionExpiredPayload {
210                token_handle: token_handle.clone(),
211                expired_at: request.expired_at,
212            })?,
213        )
214        .await?;
215        self.refresh().await?;
216        Ok(self
217            .projection
218            .lock()
219            .expect("session projection poisoned")
220            .sessions
221            .get(&token_handle)
222            .map(|entry| entry.record.to_session(id)))
223    }
224
225    async fn append(
226        &self,
227        kind: &'static str,
228        payload: serde_json::Value,
229    ) -> Result<EventId, SessionError> {
230        self.event_log
231            .append(&self.topic, LogEvent::new(kind, payload))
232            .await
233            .map_err(SessionError::from)
234    }
235
236    async fn refresh(&self) -> Result<(), SessionError> {
237        let from = self
238            .projection
239            .lock()
240            .expect("session projection poisoned")
241            .last_event_id;
242        let events = self
243            .event_log
244            .read_range(&self.topic, from, usize::MAX)
245            .await?;
246        if events.is_empty() {
247            return Ok(());
248        }
249
250        let mut projection = self.projection.lock().expect("session projection poisoned");
251        for (event_id, event) in events {
252            if projection
253                .last_event_id
254                .is_some_and(|last_event_id| event_id <= last_event_id)
255            {
256                continue;
257            }
258            apply_event(&mut projection, event_id, event)?;
259            projection.last_event_id = Some(event_id);
260        }
261        Ok(())
262    }
263
264    fn has_seen_handle(&self, token_handle: &str) -> bool {
265        self.projection
266            .lock()
267            .expect("session projection poisoned")
268            .seen_handles
269            .contains(token_handle)
270    }
271
272    fn created_by_event(&self, token_handle: &str, event_id: EventId) -> bool {
273        self.projection
274            .lock()
275            .expect("session projection poisoned")
276            .sessions
277            .get(token_handle)
278            .is_some_and(|entry| entry.created_event_id == event_id)
279    }
280
281    fn projected_session(
282        &self,
283        token_handle: &str,
284        id: impl Into<String>,
285        now: OffsetDateTime,
286    ) -> Option<Session> {
287        let id = id.into();
288        self.projection
289            .lock()
290            .expect("session projection poisoned")
291            .sessions
292            .get(token_handle)
293            .filter(|entry| !entry.expired)
294            .map(|entry| &entry.record)
295            .filter(|record| !record.is_expired_at(now))
296            .map(|record| record.to_session(id))
297    }
298}
299
300pub async fn create(store: &SessionStore, request: CreateSession) -> Result<Session, SessionError> {
301    store.create(request).await
302}
303
304pub async fn get(
305    store: &SessionStore,
306    id: &str,
307    now: OffsetDateTime,
308) -> Result<Option<Session>, SessionError> {
309    store.get(id, now).await
310}
311
312pub async fn touch(
313    store: &SessionStore,
314    request: TouchSession,
315) -> Result<Option<Session>, SessionError> {
316    store.touch(request).await
317}
318
319pub async fn expire(
320    store: &SessionStore,
321    request: ExpireSession,
322) -> Result<Option<Session>, SessionError> {
323    store.expire(request).await
324}
325
326#[derive(Debug, Clone, PartialEq, Eq)]
327pub enum SessionError {
328    Duplicate(String),
329    Invalid(String),
330    Log(String),
331    Serde(String),
332}
333
334impl fmt::Display for SessionError {
335    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
336        match self {
337            Self::Duplicate(id) => write!(f, "session '{id}' already exists"),
338            Self::Invalid(message) | Self::Log(message) | Self::Serde(message) => message.fmt(f),
339        }
340    }
341}
342
343impl std::error::Error for SessionError {}
344
345impl From<LogError> for SessionError {
346    fn from(value: LogError) -> Self {
347        Self::Log(value.to_string())
348    }
349}
350
351impl From<serde_json::Error> for SessionError {
352    fn from(value: serde_json::Error) -> Self {
353        Self::Serde(value.to_string())
354    }
355}
356
357#[derive(Default)]
358struct SessionProjection {
359    sessions: BTreeMap<String, ProjectedSession>,
360    seen_handles: BTreeSet<String>,
361    last_event_id: Option<EventId>,
362}
363
364struct ProjectedSession {
365    record: SessionRecord,
366    expired: bool,
367    created_event_id: EventId,
368}
369
370#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
371struct SessionRecord {
372    token_handle: String,
373    principal: String,
374    #[serde(with = "time::serde::rfc3339")]
375    created_at: OffsetDateTime,
376    #[serde(with = "time::serde::rfc3339")]
377    last_seen_at: OffsetDateTime,
378    #[serde(with = "time::serde::rfc3339")]
379    expires_at: OffsetDateTime,
380    #[serde(default)]
381    attributes: SessionAttributes,
382}
383
384impl SessionRecord {
385    fn is_expired_at(&self, now: OffsetDateTime) -> bool {
386        self.expires_at <= now
387    }
388
389    fn to_session(&self, id: impl Into<String>) -> Session {
390        Session {
391            id: id.into(),
392            principal: self.principal.clone(),
393            created_at: self.created_at,
394            last_seen_at: self.last_seen_at,
395            expires_at: self.expires_at,
396            attributes: self.attributes.clone(),
397        }
398    }
399}
400
401#[derive(Serialize, Deserialize)]
402struct SessionCreatedPayload {
403    record: SessionRecord,
404}
405
406#[derive(Serialize, Deserialize)]
407struct SessionTouchedPayload {
408    token_handle: String,
409    #[serde(with = "time::serde::rfc3339")]
410    last_seen_at: OffsetDateTime,
411    #[serde(default, with = "time::serde::rfc3339::option")]
412    expires_at: Option<OffsetDateTime>,
413    #[serde(default)]
414    attributes: Option<SessionAttributes>,
415}
416
417#[derive(Serialize, Deserialize)]
418struct SessionExpiredPayload {
419    token_handle: String,
420    #[serde(with = "time::serde::rfc3339")]
421    expired_at: OffsetDateTime,
422}
423
424fn apply_event(
425    projection: &mut SessionProjection,
426    event_id: EventId,
427    event: LogEvent,
428) -> Result<(), SessionError> {
429    match event.kind.as_str() {
430        SESSION_CREATED_KIND => {
431            let payload: SessionCreatedPayload = serde_json::from_value(event.payload)?;
432            projection
433                .seen_handles
434                .insert(payload.record.token_handle.clone());
435            projection
436                .sessions
437                .entry(payload.record.token_handle.clone())
438                .or_insert_with(|| ProjectedSession {
439                    record: payload.record,
440                    expired: false,
441                    created_event_id: event_id,
442                });
443        }
444        SESSION_TOUCHED_KIND => {
445            let payload: SessionTouchedPayload = serde_json::from_value(event.payload)?;
446            if let Some(entry) = projection.sessions.get_mut(&payload.token_handle) {
447                if !entry.expired {
448                    entry.record.last_seen_at = entry.record.last_seen_at.max(payload.last_seen_at);
449                    if let Some(expires_at) = payload.expires_at {
450                        entry.record.expires_at = expires_at;
451                    }
452                    if let Some(attributes) = payload.attributes {
453                        entry.record.attributes = attributes;
454                    }
455                }
456            }
457        }
458        SESSION_EXPIRED_KIND => {
459            let payload: SessionExpiredPayload = serde_json::from_value(event.payload)?;
460            projection.seen_handles.insert(payload.token_handle.clone());
461            if let Some(entry) = projection.sessions.get_mut(&payload.token_handle) {
462                entry.expired = true;
463                entry.record.expires_at = entry.record.expires_at.min(payload.expired_at);
464            }
465        }
466        _ => {}
467    }
468    Ok(())
469}
470
471fn validate_session_id(id: impl Into<String>) -> Result<String, SessionError> {
472    let id = id.into();
473    let trimmed = id.trim();
474    if trimmed.is_empty() {
475        return Err(SessionError::Invalid(
476            "session id cannot be empty".to_string(),
477        ));
478    }
479    if trimmed.chars().count() < MIN_SESSION_TOKEN_CHARS {
480        return Err(SessionError::Invalid(format!(
481            "session id must be at least {MIN_SESSION_TOKEN_CHARS} characters"
482        )));
483    }
484    Ok(trimmed.to_string())
485}
486
487fn validate_principal(principal: impl Into<String>) -> Result<String, SessionError> {
488    let principal = principal.into();
489    let trimmed = principal.trim();
490    if trimmed.is_empty() {
491        return Err(SessionError::Invalid(
492            "session principal cannot be empty".to_string(),
493        ));
494    }
495    Ok(trimmed.to_string())
496}
497
498fn generate_session_token() -> String {
499    let random: [u8; TOKEN_RANDOM_BYTES] = rand::random();
500    let token = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(random);
501    format!("{GENERATED_TOKEN_PREFIX}{token}")
502}
503
504fn session_token_handle(id: &str) -> Result<String, SessionError> {
505    let id = validate_session_id(id)?;
506    let mut hasher = Sha256::new();
507    hasher.update(b"harn.orchestrator.session-token.v1\0");
508    hasher.update(id.as_bytes());
509    Ok(format!(
510        "{TOKEN_HANDLE_PREFIX}{}",
511        hex::encode(hasher.finalize())
512    ))
513}
514
515#[cfg(test)]
516mod tests {
517    use std::sync::Arc;
518
519    use tempfile::tempdir;
520    use time::format_description::well_known::Rfc3339;
521
522    use super::*;
523    use crate::event_log::{FileEventLog, MemoryEventLog};
524
525    const SESSION_ID: &str = "harn_sess_test_abcdefghijklmnopqrstuvwxyz0123456789";
526
527    fn at(raw: &str) -> OffsetDateTime {
528        OffsetDateTime::parse(raw, &Rfc3339).expect("valid rfc3339 timestamp")
529    }
530
531    fn memory_log() -> Arc<AnyEventLog> {
532        Arc::new(AnyEventLog::Memory(MemoryEventLog::new(32)))
533    }
534
535    fn memory_store() -> SessionStore {
536        SessionStore::new(memory_log())
537    }
538
539    #[tokio::test]
540    async fn lifecycle_create_touch_expire_round_trip() {
541        let store = memory_store();
542        let created_at = at("2026-05-02T12:00:00Z");
543        let touched_at = at("2026-05-02T12:30:00Z");
544        let expires_at = at("2026-05-02T13:00:00Z");
545        let mut attributes = BTreeMap::new();
546        attributes.insert("role".to_string(), serde_json::json!("operator"));
547
548        let created = store
549            .create(CreateSession {
550                id: Some(SESSION_ID.to_string()),
551                principal: "user-1".to_string(),
552                created_at: Some(created_at),
553                expires_at,
554                attributes: attributes.clone(),
555            })
556            .await
557            .expect("create session");
558
559        assert_eq!(created.id, SESSION_ID);
560        assert_eq!(created.last_seen_at, created_at);
561        assert_eq!(created.attributes, attributes);
562        assert_eq!(
563            store
564                .get(SESSION_ID, created_at)
565                .await
566                .expect("get session")
567                .expect("session exists"),
568            created
569        );
570
571        let touched = store
572            .touch(TouchSession::new(SESSION_ID, touched_at))
573            .await
574            .expect("touch session")
575            .expect("active session");
576        assert_eq!(touched.last_seen_at, touched_at);
577        assert_eq!(touched.expires_at, expires_at);
578
579        let expired = store
580            .expire(ExpireSession::new(SESSION_ID, touched_at))
581            .await
582            .expect("expire session")
583            .expect("known session");
584        assert!(expired.is_expired_at(touched_at));
585        assert!(store
586            .get(SESSION_ID, touched_at)
587            .await
588            .expect("get expired session")
589            .is_none());
590    }
591
592    #[tokio::test]
593    async fn store_projection_replays_persisted_event_log() {
594        let dir = tempdir().expect("tempdir");
595        {
596            let event_log = Arc::new(AnyEventLog::File(
597                FileEventLog::open(dir.path().join("events"), 32).expect("open event log"),
598            ));
599            let first = SessionStore::new(event_log.clone());
600            first
601                .create(CreateSession {
602                    id: Some(SESSION_ID.to_string()),
603                    principal: "user-1".to_string(),
604                    created_at: Some(at("2026-05-02T12:00:00Z")),
605                    expires_at: at("2026-05-02T13:00:00Z"),
606                    attributes: BTreeMap::new(),
607                })
608                .await
609                .expect("create session");
610        }
611
612        let reopened_log = Arc::new(AnyEventLog::File(
613            FileEventLog::open(dir.path().join("events"), 32).expect("reopen event log"),
614        ));
615        let restored = SessionStore::new(reopened_log);
616        let session = restored
617            .get(SESSION_ID, at("2026-05-02T12:30:00Z"))
618            .await
619            .expect("get restored session")
620            .expect("session restored");
621        assert_eq!(session.principal, "user-1");
622    }
623
624    #[tokio::test]
625    async fn event_log_payloads_do_not_persist_raw_bearer_tokens() {
626        let event_log = memory_log();
627        let store = SessionStore::new(event_log.clone());
628        store
629            .create(CreateSession {
630                id: Some(SESSION_ID.to_string()),
631                principal: "user-1".to_string(),
632                created_at: Some(at("2026-05-02T12:00:00Z")),
633                expires_at: at("2026-05-02T13:00:00Z"),
634                attributes: BTreeMap::new(),
635            })
636            .await
637            .expect("create session");
638        store
639            .touch(TouchSession::new(SESSION_ID, at("2026-05-02T12:30:00Z")))
640            .await
641            .expect("touch session");
642        store
643            .expire(ExpireSession::new(SESSION_ID, at("2026-05-02T12:45:00Z")))
644            .await
645            .expect("expire session");
646
647        let events = event_log
648            .read_range(
649                &Topic::new(SESSIONS_TOPIC).expect("sessions topic"),
650                None,
651                usize::MAX,
652            )
653            .await
654            .expect("read session log");
655        let payloads = events
656            .into_iter()
657            .map(|(_, event)| event.payload.to_string())
658            .collect::<Vec<_>>()
659            .join("\n");
660        assert!(!payloads.contains(SESSION_ID));
661        assert!(payloads.contains(TOKEN_HANDLE_PREFIX));
662    }
663
664    #[tokio::test]
665    async fn expired_sessions_cannot_be_touched() {
666        let store = memory_store();
667        store
668            .create(CreateSession {
669                id: Some(SESSION_ID.to_string()),
670                principal: "user-1".to_string(),
671                created_at: Some(at("2026-05-02T12:00:00Z")),
672                expires_at: at("2026-05-02T12:05:00Z"),
673                attributes: BTreeMap::new(),
674            })
675            .await
676            .expect("create session");
677
678        assert!(store
679            .touch(TouchSession::new(SESSION_ID, at("2026-05-02T12:06:00Z")))
680            .await
681            .expect("touch expired session")
682            .is_none());
683    }
684
685    #[tokio::test]
686    async fn duplicate_session_ids_are_rejected() {
687        let store = memory_store();
688        let request = CreateSession {
689            id: Some(SESSION_ID.to_string()),
690            principal: "user-1".to_string(),
691            created_at: Some(at("2026-05-02T12:00:00Z")),
692            expires_at: at("2026-05-02T13:00:00Z"),
693            attributes: BTreeMap::new(),
694        };
695        store.create(request.clone()).await.expect("first create");
696
697        let err = store.create(request).await.expect_err("duplicate rejected");
698        assert_eq!(err, SessionError::Duplicate(SESSION_ID.to_string()));
699    }
700
701    #[tokio::test]
702    async fn concurrent_duplicate_creates_return_one_winner() {
703        let event_log = memory_log();
704        let first = SessionStore::new(event_log.clone());
705        let second = SessionStore::new(event_log);
706        let request = CreateSession {
707            id: Some(SESSION_ID.to_string()),
708            principal: "user-1".to_string(),
709            created_at: Some(at("2026-05-02T12:00:00Z")),
710            expires_at: at("2026-05-02T13:00:00Z"),
711            attributes: BTreeMap::new(),
712        };
713
714        let (left, right) = tokio::join!(first.create(request.clone()), second.create(request));
715        let results = [left, right];
716        assert_eq!(results.iter().filter(|result| result.is_ok()).count(), 1);
717        assert_eq!(
718            results
719                .iter()
720                .filter(
721                    |result| matches!(result, Err(SessionError::Duplicate(id)) if id == SESSION_ID)
722                )
723                .count(),
724            1
725        );
726    }
727
728    #[tokio::test]
729    async fn generated_session_ids_are_high_entropy_tokens() {
730        let store = memory_store();
731        let session = store
732            .create(CreateSession {
733                id: None,
734                principal: "user-1".to_string(),
735                created_at: Some(at("2026-05-02T12:00:00Z")),
736                expires_at: at("2026-05-02T13:00:00Z"),
737                attributes: BTreeMap::new(),
738            })
739            .await
740            .expect("create generated session");
741
742        assert!(session.id.starts_with(GENERATED_TOKEN_PREFIX));
743        assert!(session.id.len() >= MIN_SESSION_TOKEN_CHARS);
744    }
745}