Skip to main content

greentic_types/
session.rs

1//! Session identity and cursor helpers.
2
3use alloc::borrow::ToOwned;
4use alloc::string::String;
5
6#[cfg(feature = "schemars")]
7use schemars::JsonSchema;
8#[cfg(feature = "serde")]
9use serde::{Deserialize, Serialize};
10
11use crate::{FlowId, PackId, TenantCtx};
12
13use sha2::{Digest, Sha256};
14
15/// Unique key referencing a persisted session.
16#[derive(Clone, Debug, PartialEq, Eq, Hash)]
17#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
18#[cfg_attr(feature = "schemars", derive(JsonSchema))]
19#[cfg_attr(feature = "serde", serde(transparent))]
20pub struct SessionKey(pub String);
21
22impl SessionKey {
23    /// Returns the session key as a string slice.
24    pub fn as_str(&self) -> &str {
25        &self.0
26    }
27
28    /// Creates a new session key from the supplied string.
29    pub fn new(value: impl Into<String>) -> Self {
30        Self(value.into())
31    }
32
33    /// Generates a random session key using [`uuid`], when enabled.
34    #[cfg(feature = "uuid")]
35    pub fn generate() -> Self {
36        Self(uuid::Uuid::new_v4().to_string())
37    }
38}
39
40impl From<String> for SessionKey {
41    fn from(value: String) -> Self {
42        Self(value)
43    }
44}
45
46impl From<&str> for SessionKey {
47    fn from(value: &str) -> Self {
48        Self(value.to_owned())
49    }
50}
51
52impl core::fmt::Display for SessionKey {
53    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
54        f.write_str(self.as_str())
55    }
56}
57
58#[cfg(feature = "uuid")]
59impl From<uuid::Uuid> for SessionKey {
60    fn from(value: uuid::Uuid) -> Self {
61        Self(value.to_string())
62    }
63}
64
65const DEFAULT_CANONICAL_ANCHOR: &str = "conversation";
66const DEFAULT_CANONICAL_USER: &str = "user";
67
68/// Build the canonical `{tenant}:{provider}:{anchor}:{user}` session key.
69///
70/// All canonical adapters are expected to follow this format so pause/resume semantics remain
71/// deterministic across ingress providers. The anchor defaults to `conversation` and the user
72/// defaults to `user` when those fields are not supplied.
73pub fn canonical_session_key(
74    tenant: impl AsRef<str>,
75    provider: impl AsRef<str>,
76    anchor: Option<&str>,
77    user: Option<&str>,
78) -> SessionKey {
79    SessionKey::new(format!(
80        "{}:{}:{}:{}",
81        tenant.as_ref(),
82        provider.as_ref(),
83        anchor.unwrap_or(DEFAULT_CANONICAL_ANCHOR),
84        user.unwrap_or(DEFAULT_CANONICAL_USER)
85    ))
86}
87
88/// Cursor pointing at a session's position in a flow graph.
89#[derive(Clone, Debug, PartialEq, Eq, Hash)]
90#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
91#[cfg_attr(feature = "schemars", derive(JsonSchema))]
92pub struct SessionCursor {
93    /// Identifier of the node currently owning the session.
94    pub node_pointer: String,
95    /// Optional wait reason emitted by the node.
96    #[cfg_attr(
97        feature = "serde",
98        serde(default, skip_serializing_if = "Option::is_none")
99    )]
100    pub wait_reason: Option<String>,
101    /// Optional marker describing pending outbox operations.
102    #[cfg_attr(
103        feature = "serde",
104        serde(default, skip_serializing_if = "Option::is_none")
105    )]
106    pub outbox_marker: Option<String>,
107}
108
109impl SessionCursor {
110    /// Creates a new cursor pointing at the provided node identifier.
111    pub fn new(node_pointer: impl Into<String>) -> Self {
112        Self {
113            node_pointer: node_pointer.into(),
114            wait_reason: None,
115            outbox_marker: None,
116        }
117    }
118
119    /// Assigns a wait reason to the cursor.
120    pub fn with_wait_reason(mut self, reason: impl Into<String>) -> Self {
121        self.wait_reason = Some(reason.into());
122        self
123    }
124
125    /// Assigns an outbox marker to the cursor.
126    pub fn with_outbox_marker(mut self, marker: impl Into<String>) -> Self {
127        self.outbox_marker = Some(marker.into());
128        self
129    }
130}
131
132/// Persisted session payload describing how to resume a flow.
133#[derive(Clone, Debug, PartialEq, Eq)]
134#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
135#[cfg_attr(feature = "schemars", derive(JsonSchema))]
136pub struct SessionData {
137    /// Tenant context associated with the session.
138    pub tenant_ctx: TenantCtx,
139    /// Flow identifier being executed.
140    pub flow_id: FlowId,
141    /// Optional pack identifier tied to the session.
142    #[cfg_attr(
143        feature = "serde",
144        serde(default, skip_serializing_if = "Option::is_none")
145    )]
146    pub pack_id: Option<PackId>,
147    /// Cursor pinpointing where execution paused.
148    pub cursor: SessionCursor,
149    /// Serialized execution context/state snapshot.
150    pub context_json: String,
151}
152
153/// Stable scope describing where a reply is anchored (conversation/thread/reply).
154#[derive(Clone, Debug, PartialEq, Eq, Hash)]
155#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
156#[cfg_attr(feature = "schemars", derive(JsonSchema))]
157pub struct ReplyScope {
158    /// Conversation identifier.
159    pub conversation: String,
160    /// Optional thread/topic identifier.
161    #[cfg_attr(
162        feature = "serde",
163        serde(default, skip_serializing_if = "Option::is_none")
164    )]
165    pub thread: Option<String>,
166    /// Optional reply-to identifier.
167    #[cfg_attr(
168        feature = "serde",
169        serde(default, skip_serializing_if = "Option::is_none")
170    )]
171    pub reply_to: Option<String>,
172    /// Optional correlation identifier.
173    #[cfg_attr(
174        feature = "serde",
175        serde(default, skip_serializing_if = "Option::is_none")
176    )]
177    pub correlation: Option<String>,
178}
179
180/// Legacy alias for reply scope.
181#[deprecated(since = "0.4.52", note = "use ReplyScope")]
182pub type WaitScope = ReplyScope;
183
184impl ReplyScope {
185    /// Returns a deterministic hash for the scope.
186    pub fn scope_hash(&self) -> String {
187        let mut canonical = String::new();
188        canonical.push_str(self.conversation.as_str());
189        canonical.push('\n');
190        canonical.push_str(self.thread.as_deref().unwrap_or(""));
191        canonical.push('\n');
192        canonical.push_str(self.reply_to.as_deref().unwrap_or(""));
193        canonical.push('\n');
194        canonical.push_str(self.correlation.as_deref().unwrap_or(""));
195
196        let digest = Sha256::digest(canonical.as_bytes());
197        hex_encode(digest.as_slice())
198    }
199}
200
201fn hex_encode(bytes: &[u8]) -> String {
202    const HEX: &[u8; 16] = b"0123456789abcdef";
203    let mut out = String::with_capacity(bytes.len() * 2);
204    for &byte in bytes {
205        out.push(HEX[(byte >> 4) as usize] as char);
206        out.push(HEX[(byte & 0x0f) as usize] as char);
207    }
208    out
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214
215    #[cfg(feature = "serde")]
216    use serde_json::Value;
217
218    #[test]
219    fn canonical_session_key_includes_components() {
220        let key = canonical_session_key("tenant", "webhook", Some("room-1"), Some("user-5"));
221        assert_eq!(key.as_str(), "tenant:webhook:room-1:user-5");
222    }
223
224    #[test]
225    fn canonical_session_key_defaults_anchor_and_user() {
226        let key = canonical_session_key("tenant", "webhook", None, None);
227        assert_eq!(key.as_str(), "tenant:webhook:conversation:user");
228    }
229
230    #[test]
231    fn reply_scope_hash_is_deterministic() {
232        let scope = ReplyScope {
233            conversation: "chat-1".to_owned(),
234            thread: Some("topic-9".to_owned()),
235            reply_to: Some("msg-3".to_owned()),
236            correlation: Some("cid-7".to_owned()),
237        };
238
239        assert_eq!(
240            scope.scope_hash(),
241            "5251e2c9eb975466809eecbe51be7aa8dc7785ad05cb393a8f47a3a2f42cc544"
242        );
243    }
244
245    #[test]
246    fn reply_scope_hash_changes_with_fields() {
247        let base = ReplyScope {
248            conversation: "chat-1".to_owned(),
249            thread: Some("topic-9".to_owned()),
250            reply_to: Some("msg-3".to_owned()),
251            correlation: Some("cid-7".to_owned()),
252        };
253
254        let mut altered = base.clone();
255        altered.reply_to = Some("msg-4".to_owned());
256
257        assert_ne!(base.scope_hash(), altered.scope_hash());
258    }
259
260    #[cfg(feature = "serde")]
261    #[test]
262    fn reply_scope_roundtrip() {
263        let scope = ReplyScope {
264            conversation: "chat-2".to_owned(),
265            thread: None,
266            reply_to: Some("msg-9".to_owned()),
267            correlation: None,
268        };
269
270        let value = serde_json::to_value(&scope)
271            .unwrap_or_else(|err| panic!("serialize reply scope failed: {err}"));
272        let roundtrip: ReplyScope = serde_json::from_value(value)
273            .unwrap_or_else(|err| panic!("deserialize reply scope failed: {err}"));
274
275        assert_eq!(roundtrip, scope);
276    }
277
278    #[cfg(feature = "serde")]
279    #[test]
280    fn session_data_pack_id_is_optional() {
281        let data = SessionData {
282            tenant_ctx: TenantCtx::new(
283                "env"
284                    .parse()
285                    .unwrap_or_else(|err| panic!("parse env failed: {err}")),
286                "tenant"
287                    .parse()
288                    .unwrap_or_else(|err| panic!("parse tenant failed: {err}")),
289            ),
290            flow_id: "flow-1"
291                .parse()
292                .unwrap_or_else(|err| panic!("parse flow failed: {err}")),
293            pack_id: None,
294            cursor: SessionCursor::new("node-1"),
295            context_json: "{}".to_owned(),
296        };
297
298        let value = serde_json::to_value(&data)
299            .unwrap_or_else(|err| panic!("serialize session failed: {err}"));
300        assert!(
301            value.get("pack_id").is_none(),
302            "pack_id should be omitted when None"
303        );
304
305        let mut data_with_pack = data.clone();
306        data_with_pack.pack_id = Some(
307            "greentic.demo.pack"
308                .parse()
309                .unwrap_or_else(|err| panic!("parse pack id failed: {err}")),
310        );
311
312        let value = serde_json::to_value(&data_with_pack)
313            .unwrap_or_else(|err| panic!("serialize session failed: {err}"));
314        assert!(value.get("pack_id").is_some());
315
316        let object = value
317            .as_object()
318            .cloned()
319            .unwrap_or_else(|| panic!("expected session value to be a JSON object"));
320        let roundtrip: SessionData = serde_json::from_value(Value::Object(object))
321            .unwrap_or_else(|err| panic!("deserialize session failed: {err}"));
322        assert_eq!(roundtrip.pack_id, data_with_pack.pack_id);
323    }
324}