gsm_runner/
engine.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use greentic_types::{FlowId, PackId, SessionCursor, SessionKey, UserId};
4use gsm_core::{
5    ChannelMessage, MessageEnvelope, OutKind, OutMessage, Platform, TenantCtx, egress_subject,
6};
7use gsm_session::{SessionData, SharedSessionStore};
8use gsm_telemetry::set_current_tenant_ctx;
9use serde_json::{Value, json};
10use std::str::FromStr;
11
12use crate::model::Flow;
13use crate::{card_node, qa_node, template_node, tool_node};
14
15#[derive(Clone, Copy, Debug)]
16pub enum ToolMode {
17    Live,
18    Stub,
19}
20
21#[derive(Clone, Debug)]
22pub struct ExecutionOptions {
23    pub tool_mode: ToolMode,
24    pub allow_agent: bool,
25    pub tool_endpoint: String,
26}
27
28#[derive(Clone, Debug, serde::Serialize)]
29pub struct ToolCall {
30    pub tool: String,
31    pub action: String,
32    pub input: Value,
33}
34
35#[derive(Debug)]
36pub struct RunnerOutcome {
37    pub out_messages: Vec<OutMessage>,
38    pub tool_calls: Vec<ToolCall>,
39    pub state: Value,
40}
41
42#[async_trait]
43pub trait RunnerSink: Send + Sync {
44    async fn publish_out_message(&self, subject: &str, out: &OutMessage) -> Result<()>;
45}
46
47#[async_trait]
48impl RunnerSink for async_nats::Client {
49    async fn publish_out_message(&self, subject: &str, out: &OutMessage) -> Result<()> {
50        self.publish(subject.to_string(), serde_json::to_vec(out)?.into())
51            .await?;
52        Ok(())
53    }
54}
55
56pub fn message_from_channel(channel: &ChannelMessage) -> Result<MessageEnvelope> {
57    let platform = Platform::from_str(channel.channel_id.as_str())
58        .map_err(|err| anyhow::anyhow!("invalid platform: {err}"))?;
59    let payload = &channel.payload;
60    let chat_id = payload
61        .get("chat_id")
62        .and_then(|v| v.as_str())
63        .map(str::to_string)
64        .unwrap_or_else(|| channel.session_id.clone());
65    if chat_id.trim().is_empty() {
66        anyhow::bail!("channel message missing chat_id");
67    }
68    let msg_id = payload
69        .get("msg_id")
70        .and_then(|v| v.as_str())
71        .unwrap_or("unknown")
72        .to_string();
73    let timestamp = payload
74        .get("timestamp")
75        .and_then(|v| v.as_str())
76        .unwrap_or("1970-01-01T00:00:00Z")
77        .to_string();
78    let user_id = payload
79        .get("user_id")
80        .and_then(|v| v.as_str())
81        .unwrap_or("unknown")
82        .to_string();
83    let thread_id = payload
84        .get("thread_id")
85        .and_then(|v| v.as_str())
86        .map(str::to_string);
87    let text = payload
88        .get("text")
89        .and_then(|v| v.as_str())
90        .map(str::to_string);
91
92    let mut context = std::collections::BTreeMap::new();
93    if let Some(meta) = payload.get("metadata").and_then(|v| v.as_object()) {
94        for (k, v) in meta {
95            context.insert(k.clone(), v.clone());
96        }
97    }
98    if let Some(headers) = payload.get("headers") {
99        context.insert("headers".into(), headers.clone());
100    }
101
102    Ok(MessageEnvelope {
103        tenant: channel.tenant.tenant.as_str().to_string(),
104        platform,
105        chat_id,
106        user_id,
107        thread_id,
108        msg_id,
109        text,
110        timestamp,
111        context,
112    })
113}
114
115#[allow(clippy::too_many_arguments)]
116pub async fn run_flow(
117    flow_id: &str,
118    flow: &Flow,
119    tenant_ctx: &TenantCtx,
120    env: &MessageEnvelope,
121    sessions: &SharedSessionStore,
122    hbs: &handlebars::Handlebars<'static>,
123    sink: &dyn RunnerSink,
124    options: &ExecutionOptions,
125    pack_id: Option<PackId>,
126) -> Result<RunnerOutcome> {
127    let active_user = tenant_ctx
128        .user
129        .clone()
130        .or_else(|| tenant_ctx.user_id.clone())
131        .or_else(|| UserId::try_from(env.user_id.as_str()).ok());
132    let mut previous_session: Option<SessionKey> = None;
133    let mut state = if let Some(user) = active_user.clone() {
134        match sessions.find_by_user(tenant_ctx, &user).await {
135            Ok(Some((key, data))) => {
136                previous_session = Some(key);
137                match serde_json::from_str::<serde_json::Value>(&data.context_json) {
138                    Ok(value) if value.is_object() => value,
139                    Ok(_) => json!({}),
140                    Err(err) => {
141                        tracing::warn!(error = %err, "failed to parse stored session context");
142                        json!({})
143                    }
144                }
145            }
146            Ok(None) => json!({}),
147            Err(err) => {
148                tracing::warn!(error = %err, "session lookup failed; starting fresh");
149                json!({})
150            }
151        }
152    } else {
153        json!({})
154    };
155
156    let mut current = flow.r#in.clone();
157    let mut payload: serde_json::Value = serde_json::json!({});
158    let mut out_messages = Vec::new();
159    let mut tool_calls = Vec::new();
160
161    loop {
162        let node = flow
163            .nodes
164            .get(&current)
165            .ok_or_else(|| anyhow::anyhow!("node not found: {current}"))?;
166        tracing::info!("node={}", current);
167
168        if let Some(qa) = &node.qa {
169            if options.allow_agent {
170                qa_node::run_qa(qa, env, &mut state, hbs).await?;
171            } else {
172                qa_node::run_qa_offline(qa, env, &mut state).await?;
173            }
174        }
175
176        if let Some(tool) = &node.tool {
177            let input = tool_node::render_tool_input(tool, env, &state)?;
178            tool_calls.push(ToolCall {
179                tool: tool.tool.clone(),
180                action: tool.action.clone(),
181                input: input.clone(),
182            });
183            payload = match options.tool_mode {
184                ToolMode::Live => {
185                    tool_node::run_tool_with_input(tool, input, options.tool_endpoint.as_str())
186                        .await?
187                }
188                ToolMode::Stub => tool_node::run_tool_stub_with_input(input)?,
189            };
190        }
191
192        if let Some(tpl) = &node.template {
193            let out = template_node::render_template(tpl, hbs, env, &state, &payload)?;
194            let outmsg = OutMessage {
195                ctx: tenant_ctx.clone(),
196                tenant: env.tenant.clone(),
197                platform: env.platform.clone(),
198                chat_id: env.chat_id.clone(),
199                thread_id: env.thread_id.clone(),
200                kind: OutKind::Text,
201                text: Some(out),
202                message_card: None,
203                adaptive_card: None,
204                meta: env.context.clone(),
205            };
206            let team = tenant_ctx
207                .team
208                .as_ref()
209                .map(|team| team.as_str())
210                .unwrap_or("default");
211            let subject = egress_subject(
212                tenant_ctx.env.as_str(),
213                tenant_ctx.tenant.as_str(),
214                team,
215                env.platform.as_str(),
216            );
217            sink.publish_out_message(&subject, &outmsg).await?;
218            out_messages.push(outmsg);
219        }
220
221        if let Some(card) = &node.card {
222            let card = card_node::render_card(card, hbs, env, &state, &payload)?;
223            let outmsg = OutMessage {
224                ctx: tenant_ctx.clone(),
225                tenant: env.tenant.clone(),
226                platform: env.platform.clone(),
227                chat_id: env.chat_id.clone(),
228                thread_id: env.thread_id.clone(),
229                kind: OutKind::Card,
230                text: None,
231                message_card: Some(card),
232                adaptive_card: None,
233                meta: env.context.clone(),
234            };
235            let team = tenant_ctx
236                .team
237                .as_ref()
238                .map(|team| team.as_str())
239                .unwrap_or("default");
240            let subject = egress_subject(
241                tenant_ctx.env.as_str(),
242                tenant_ctx.tenant.as_str(),
243                team,
244                env.platform.as_str(),
245            );
246            sink.publish_out_message(&subject, &outmsg).await?;
247            out_messages.push(outmsg);
248        }
249
250        if let Some(next) = node.routes.first() {
251            if next == "end" {
252                break;
253            }
254            current = next.clone();
255            continue;
256        }
257        break;
258    }
259
260    let session_data = SessionData {
261        tenant_ctx: tenant_ctx.clone(),
262        flow_id: FlowId::new(flow_id)?,
263        pack_id,
264        cursor: SessionCursor::new(current),
265        context_json: serde_json::to_string(&state)?,
266    };
267
268    if let Some(existing_key) = previous_session {
269        sessions.update_session(&existing_key, session_data).await?;
270    } else if active_user.is_some() {
271        sessions.create_session(tenant_ctx, session_data).await?;
272    } else {
273        tracing::debug!("skipping session persistence; no user context available");
274    }
275
276    Ok(RunnerOutcome {
277        out_messages,
278        tool_calls,
279        state,
280    })
281}
282
283pub fn set_tenant_ctx(ctx: TenantCtx) {
284    set_current_tenant_ctx(ctx);
285}
286
287pub fn env_from_channel(channel: &ChannelMessage) -> Result<MessageEnvelope> {
288    message_from_channel(channel)
289}