Skip to main content

greentic_runner_host/
activity.rs

1use serde::{Deserialize, Serialize};
2use serde_json::{Value, json};
3
4/// High-level activity payload exchanged with Greentic hosts.
5#[derive(Clone, Debug, Serialize, Deserialize)]
6pub struct Activity {
7    #[serde(default)]
8    pub(crate) kind: ActivityKind,
9    #[serde(default, skip_serializing_if = "Option::is_none")]
10    tenant: Option<String>,
11    #[serde(default, skip_serializing_if = "Option::is_none")]
12    pack_id: Option<String>,
13    #[serde(default, skip_serializing_if = "Option::is_none")]
14    flow_id: Option<String>,
15    #[serde(default, skip_serializing_if = "Option::is_none")]
16    flow_type: Option<String>,
17    #[serde(default, skip_serializing_if = "Option::is_none")]
18    session_id: Option<String>,
19    #[serde(default, skip_serializing_if = "Option::is_none")]
20    provider_id: Option<String>,
21    /// Multi-instance messaging endpoint id (M1.4). Disambiguates provider
22    /// instances of the same `provider_type` so sessions/traces partition
23    /// per-endpoint. Producer-set; the runner threads it into
24    /// `IngressEnvelope.messaging_endpoint_id`.
25    #[serde(default, skip_serializing_if = "Option::is_none")]
26    messaging_endpoint_id: Option<String>,
27    #[serde(default, skip_serializing_if = "Option::is_none")]
28    user_id: Option<String>,
29    #[serde(default, skip_serializing_if = "Option::is_none")]
30    channel_id: Option<String>,
31    #[serde(default, skip_serializing_if = "Option::is_none")]
32    conversation_id: Option<String>,
33    /// M1.5 welcome-flow override hint. Producer-supplied — the runner uses
34    /// it as a one-way override of the resolved `(pack_id, flow_id,
35    /// flow_type)` when **all** of: a messaging endpoint is asserted AND
36    /// this hint is present AND no active wait snapshot exists in this
37    /// pack's session bucket. See [`WelcomeFlowHint`] for the contract —
38    /// the **producer** decides when this is actually first contact; the
39    /// runner-host only refuses to override on top of an active wait.
40    #[serde(default, skip_serializing_if = "Option::is_none")]
41    welcome_flow_hint: Option<WelcomeFlowHint>,
42    #[serde(default)]
43    payload: Value,
44}
45
46/// M1.5 welcome-flow override hint: the `(pack_id, flow_id)` a producer wants
47/// the runner to dispatch when this activity is the user's first contact on
48/// the asserted messaging endpoint. Encodes both axes because welcome flows
49/// can live in a different pack from the resolved one (e.g. greentic-start
50/// reads the endpoint's `welcome_flow` from `Environment.messaging_endpoints`
51/// and attaches it here).
52///
53/// # First-contact ownership is on the producer
54///
55/// The runner-host's first-contact probe checks `FlowResumeStore::fetch`,
56/// which only finds **active wait snapshots**. A flow that completed (or
57/// completed without ever calling `session.wait`) leaves NO marker, so a
58/// post-completion turn would also see "no wait" and re-fire welcome. To
59/// avoid welcome-loop behaviour the producer is responsible for **only
60/// attaching this hint when the activity is actually first contact** — for
61/// greentic-start that means consulting a durable welcome-seen marker
62/// (planned follow-up) before attaching.
63///
64/// The runner-host's role is narrowly:
65/// 1. Honour the producer's hint when no active wait exists (override
66///    `(pack_id, flow_id, flow_type)` before the state machine runs).
67/// 2. Refuse to override when an active wait exists, even if the hint is
68///    present — that's the safety net against accidentally re-routing a
69///    mid-conversation turn.
70///
71/// Attaching the hint on every turn is **NOT** safe under this contract —
72/// it would route every no-wait turn through welcome.
73#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
74pub struct WelcomeFlowHint {
75    pub pack_id: String,
76    pub flow_id: String,
77}
78
79#[derive(Clone, Debug, Serialize, Deserialize, Default)]
80#[serde(tag = "kind", rename_all = "snake_case")]
81pub enum ActivityKind {
82    /// Messaging-style activity (default).
83    #[default]
84    Message,
85    /// Custom activity with user-specified action + optional flow type override.
86    Custom {
87        action: String,
88        #[serde(default, skip_serializing_if = "Option::is_none")]
89        flow_type: Option<String>,
90    },
91}
92
93impl Activity {
94    /// Create a text messaging activity payload.
95    pub fn text(text: impl Into<String>) -> Self {
96        Self {
97            kind: ActivityKind::Message,
98            tenant: None,
99            pack_id: None,
100            flow_id: None,
101            flow_type: Some("messaging".into()),
102            session_id: None,
103            provider_id: None,
104            messaging_endpoint_id: None,
105            user_id: None,
106            channel_id: None,
107            conversation_id: None,
108            welcome_flow_hint: None,
109            payload: json!({ "text": text.into() }),
110        }
111    }
112
113    /// Build a custom activity with a raw payload body.
114    pub fn custom(action: impl Into<String>, payload: Value) -> Self {
115        Self {
116            kind: ActivityKind::Custom {
117                action: action.into(),
118                flow_type: None,
119            },
120            tenant: None,
121            pack_id: None,
122            flow_id: None,
123            flow_type: None,
124            session_id: None,
125            provider_id: None,
126            messaging_endpoint_id: None,
127            user_id: None,
128            channel_id: None,
129            conversation_id: None,
130            welcome_flow_hint: None,
131            payload,
132        }
133    }
134
135    /// Attach the M1.5 welcome-flow override hint. See [`WelcomeFlowHint`]
136    /// for the contract — the **producer** is responsible for only calling
137    /// this when the activity is actually first contact on the asserted
138    /// messaging endpoint. Attaching on every turn would route every
139    /// no-active-wait turn through the welcome flow.
140    pub fn with_welcome_flow_hint(mut self, hint: WelcomeFlowHint) -> Self {
141        self.welcome_flow_hint = Some(hint);
142        self
143    }
144
145    /// Return the welcome-flow override hint, if any.
146    pub fn welcome_flow_hint(&self) -> Option<&WelcomeFlowHint> {
147        self.welcome_flow_hint.as_ref()
148    }
149
150    /// Attach a tenant identifier to the activity.
151    pub fn with_tenant(mut self, tenant: impl Into<String>) -> Self {
152        self.tenant = Some(tenant.into());
153        self
154    }
155
156    /// Target a specific flow identifier.
157    pub fn with_flow(mut self, flow_id: impl Into<String>) -> Self {
158        self.flow_id = Some(flow_id.into());
159        self
160    }
161
162    /// Target a specific pack identifier.
163    pub fn with_pack(mut self, pack_id: impl Into<String>) -> Self {
164        self.pack_id = Some(pack_id.into());
165        self
166    }
167
168    /// Hint which flow type should handle the activity.
169    pub fn with_flow_type(mut self, flow_type: impl Into<String>) -> Self {
170        let flow_type = flow_type.into();
171        self.flow_type = Some(flow_type.clone());
172        if let ActivityKind::Custom {
173            flow_type: inner, ..
174        } = &mut self.kind
175        {
176            *inner = Some(flow_type);
177        }
178        self
179    }
180
181    pub(crate) fn with_payload_field(mut self, key: impl Into<String>, value: Value) -> Self {
182        match &mut self.payload {
183            Value::Object(object) => {
184                object.insert(key.into(), value);
185            }
186            existing => {
187                let original = std::mem::replace(existing, Value::Null);
188                *existing = json!({
189                    key.into(): value,
190                    "value": original,
191                });
192            }
193        }
194        self
195    }
196
197    /// Attach a session identifier used for retries/idempotency.
198    pub fn with_session(mut self, session_id: impl Into<String>) -> Self {
199        self.session_id = Some(session_id.into());
200        self
201    }
202
203    /// Attach a provider identifier for telemetry scoping.
204    pub fn with_provider(mut self, provider: impl Into<String>) -> Self {
205        self.provider_id = Some(provider.into());
206        self
207    }
208
209    /// Attach the receiving messaging endpoint id (M1.4). Distinguishes
210    /// provider instances of the same `provider_type` (e.g. `teams-legal`
211    /// vs `teams-accounting`); the runner threads it into the envelope so
212    /// session keys and telemetry partition per-endpoint.
213    pub fn with_messaging_endpoint(mut self, endpoint_id: impl Into<String>) -> Self {
214        self.messaging_endpoint_id = Some(endpoint_id.into());
215        self
216    }
217
218    /// Attach the originating user for messaging activities.
219    pub fn from_user(mut self, user: impl Into<String>) -> Self {
220        self.user_id = Some(user.into());
221        self
222    }
223
224    /// Attach a channel identifier (chat, room, or queue) for canonical session keys.
225    pub fn in_channel(mut self, channel: impl Into<String>) -> Self {
226        self.channel_id = Some(channel.into());
227        self
228    }
229
230    /// Attach a conversation/thread identifier for canonical session keys.
231    pub fn in_conversation(mut self, conversation: impl Into<String>) -> Self {
232        self.conversation_id = Some(conversation.into());
233        self
234    }
235
236    /// Return the resolved tenant identifier, if any.
237    pub fn tenant(&self) -> Option<&str> {
238        self.tenant.as_deref()
239    }
240
241    /// Return the resolved pack identifier, if any.
242    pub fn pack_id(&self) -> Option<&str> {
243        self.pack_id.as_deref()
244    }
245
246    /// Return the resolved flow identifier hint.
247    pub fn flow_id(&self) -> Option<&str> {
248        self.flow_id.as_deref()
249    }
250
251    /// Return the resolved flow type hint.
252    pub fn flow_type(&self) -> Option<&str> {
253        self.flow_type
254            .as_deref()
255            .or_else(|| self.kind.flow_type_hint())
256    }
257
258    /// Return the originating session identifier, if supplied.
259    pub fn session_id(&self) -> Option<&str> {
260        self.session_id.as_deref()
261    }
262
263    /// Return the originating provider identifier, if supplied.
264    pub fn provider_id(&self) -> Option<&str> {
265        self.provider_id.as_deref()
266    }
267
268    /// Return the receiving messaging endpoint id (M1.4), if supplied.
269    pub fn messaging_endpoint_id(&self) -> Option<&str> {
270        self.messaging_endpoint_id.as_deref()
271    }
272
273    /// Return the originating user identifier, if supplied.
274    pub fn user(&self) -> Option<&str> {
275        self.user_id.as_deref()
276    }
277
278    /// Return the channel identifier, if supplied.
279    pub fn channel(&self) -> Option<&str> {
280        self.channel_id.as_deref()
281    }
282
283    /// Return the conversation identifier, if supplied.
284    pub fn conversation(&self) -> Option<&str> {
285        self.conversation_id.as_deref()
286    }
287
288    /// Underlying payload body.
289    pub fn payload(&self) -> &Value {
290        &self.payload
291    }
292
293    pub(crate) fn action(&self) -> Option<&str> {
294        self.kind.action_hint()
295    }
296
297    pub(crate) fn into_payload(self) -> Value {
298        self.payload
299    }
300
301    pub(crate) fn ensure_tenant(mut self, tenant: &str) -> Self {
302        if self.tenant.is_none() {
303            self.tenant = Some(tenant.to_string());
304        }
305        self
306    }
307
308    pub(crate) fn from_output(payload: Value, tenant: &str) -> Self {
309        Activity::custom("response", payload).ensure_tenant(tenant)
310    }
311}
312
313impl ActivityKind {
314    fn flow_type_hint(&self) -> Option<&str> {
315        match self {
316            ActivityKind::Message => Some("messaging"),
317            ActivityKind::Custom { flow_type, .. } => flow_type.as_deref(),
318        }
319    }
320
321    fn action_hint(&self) -> Option<&str> {
322        match self {
323            ActivityKind::Message => Some("messaging"),
324            ActivityKind::Custom { action, .. } => Some(action.as_str()),
325        }
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332
333    #[test]
334    fn with_messaging_endpoint_sets_field() {
335        let activity = Activity::text("hi").with_messaging_endpoint("teams-legal");
336        assert_eq!(activity.messaging_endpoint_id(), Some("teams-legal"));
337    }
338
339    #[test]
340    fn with_payload_field_adds_field_to_object_payload() {
341        let activity = Activity::text("show traffic")
342            .with_payload_field("fast2flow", json!({"entities": [{"kind": "date"}]}));
343
344        assert_eq!(
345            activity.payload()["fast2flow"]["entities"][0]["kind"],
346            "date"
347        );
348        assert_eq!(activity.payload()["text"], "show traffic");
349    }
350
351    #[test]
352    fn messaging_endpoint_id_defaults_to_none() {
353        let activity = Activity::text("hi");
354        assert!(activity.messaging_endpoint_id().is_none());
355    }
356
357    #[test]
358    fn messaging_endpoint_id_round_trips_through_serde() {
359        let original = Activity::text("hi").with_messaging_endpoint("teams-legal");
360        let encoded = serde_json::to_string(&original).expect("serialize");
361        assert!(encoded.contains("\"messaging_endpoint_id\":\"teams-legal\""));
362        let decoded: Activity = serde_json::from_str(&encoded).expect("deserialize");
363        assert_eq!(decoded.messaging_endpoint_id(), Some("teams-legal"));
364    }
365
366    #[test]
367    fn messaging_endpoint_id_serde_skips_when_unset() {
368        // Combined wire-compat + skip-if-none proof: an Activity without
369        // the field omits it on the wire AND decodes back with the field
370        // unset, so pre-M1.4 producers/consumers interop cleanly.
371        let encoded = serde_json::to_string(&Activity::text("hi")).expect("serialize");
372        assert!(!encoded.contains("messaging_endpoint_id"));
373        let decoded: Activity = serde_json::from_str(&encoded).expect("deserialize");
374        assert!(decoded.messaging_endpoint_id().is_none());
375    }
376}