1use anyhow::Result;
2use async_trait::async_trait;
3use greentic_types::{FlowId, PackId, SessionCursor, SessionKey, UserId};
4use gsm_core::{
5 ChannelMessage, MessageEnvelope, OutKind, OutMessage, Platform, TenantCtx, egress_subject,
6};
7use gsm_session::{SessionData, SharedSessionStore};
8use gsm_telemetry::set_current_tenant_ctx;
9use serde_json::{Value, json};
10use std::str::FromStr;
11
12use crate::model::Flow;
13use crate::{card_node, qa_node, template_node, tool_node};
14
15#[derive(Clone, Copy, Debug)]
16pub enum ToolMode {
17 Live,
18 Stub,
19}
20
21#[derive(Clone, Debug)]
22pub struct ExecutionOptions {
23 pub tool_mode: ToolMode,
24 pub allow_agent: bool,
25 pub tool_endpoint: String,
26}
27
28#[derive(Clone, Debug, serde::Serialize)]
29pub struct ToolCall {
30 pub tool: String,
31 pub action: String,
32 pub input: Value,
33}
34
35#[derive(Debug)]
36pub struct RunnerOutcome {
37 pub out_messages: Vec<OutMessage>,
38 pub tool_calls: Vec<ToolCall>,
39 pub state: Value,
40}
41
42#[async_trait]
43pub trait RunnerSink: Send + Sync {
44 async fn publish_out_message(&self, subject: &str, out: &OutMessage) -> Result<()>;
45}
46
47#[async_trait]
48impl RunnerSink for async_nats::Client {
49 async fn publish_out_message(&self, subject: &str, out: &OutMessage) -> Result<()> {
50 self.publish(subject.to_string(), serde_json::to_vec(out)?.into())
51 .await?;
52 Ok(())
53 }
54}
55
56pub fn message_from_channel(channel: &ChannelMessage) -> Result<MessageEnvelope> {
57 let platform = Platform::from_str(channel.channel_id.as_str())
58 .map_err(|err| anyhow::anyhow!("invalid platform: {err}"))?;
59 let payload = &channel.payload;
60 let chat_id = payload
61 .get("chat_id")
62 .and_then(|v| v.as_str())
63 .map(str::to_string)
64 .unwrap_or_else(|| channel.session_id.clone());
65 if chat_id.trim().is_empty() {
66 anyhow::bail!("channel message missing chat_id");
67 }
68 let msg_id = payload
69 .get("msg_id")
70 .and_then(|v| v.as_str())
71 .unwrap_or("unknown")
72 .to_string();
73 let timestamp = payload
74 .get("timestamp")
75 .and_then(|v| v.as_str())
76 .unwrap_or("1970-01-01T00:00:00Z")
77 .to_string();
78 let user_id = payload
79 .get("user_id")
80 .and_then(|v| v.as_str())
81 .unwrap_or("unknown")
82 .to_string();
83 let thread_id = payload
84 .get("thread_id")
85 .and_then(|v| v.as_str())
86 .map(str::to_string);
87 let text = payload
88 .get("text")
89 .and_then(|v| v.as_str())
90 .map(str::to_string);
91
92 let mut context = std::collections::BTreeMap::new();
93 if let Some(meta) = payload.get("metadata").and_then(|v| v.as_object()) {
94 for (k, v) in meta {
95 context.insert(k.clone(), v.clone());
96 }
97 }
98 if let Some(headers) = payload.get("headers") {
99 context.insert("headers".into(), headers.clone());
100 }
101
102 Ok(MessageEnvelope {
103 tenant: channel.tenant.tenant.as_str().to_string(),
104 platform,
105 chat_id,
106 user_id,
107 thread_id,
108 msg_id,
109 text,
110 timestamp,
111 context,
112 })
113}
114
115#[allow(clippy::too_many_arguments)]
116pub async fn run_flow(
117 flow_id: &str,
118 flow: &Flow,
119 tenant_ctx: &TenantCtx,
120 env: &MessageEnvelope,
121 sessions: &SharedSessionStore,
122 hbs: &handlebars::Handlebars<'static>,
123 sink: &dyn RunnerSink,
124 options: &ExecutionOptions,
125 pack_id: Option<PackId>,
126) -> Result<RunnerOutcome> {
127 let active_user = tenant_ctx
128 .user
129 .clone()
130 .or_else(|| tenant_ctx.user_id.clone())
131 .or_else(|| UserId::try_from(env.user_id.as_str()).ok());
132 let mut previous_session: Option<SessionKey> = None;
133 let mut state = if let Some(user) = active_user.clone() {
134 match sessions.find_by_user(tenant_ctx, &user).await {
135 Ok(Some((key, data))) => {
136 previous_session = Some(key);
137 match serde_json::from_str::<serde_json::Value>(&data.context_json) {
138 Ok(value) if value.is_object() => value,
139 Ok(_) => json!({}),
140 Err(err) => {
141 tracing::warn!(error = %err, "failed to parse stored session context");
142 json!({})
143 }
144 }
145 }
146 Ok(None) => json!({}),
147 Err(err) => {
148 tracing::warn!(error = %err, "session lookup failed; starting fresh");
149 json!({})
150 }
151 }
152 } else {
153 json!({})
154 };
155
156 let mut current = flow.r#in.clone();
157 let mut payload: serde_json::Value = serde_json::json!({});
158 let mut out_messages = Vec::new();
159 let mut tool_calls = Vec::new();
160
161 loop {
162 let node = flow
163 .nodes
164 .get(¤t)
165 .ok_or_else(|| anyhow::anyhow!("node not found: {current}"))?;
166 tracing::info!("node={}", current);
167
168 if let Some(qa) = &node.qa {
169 if options.allow_agent {
170 qa_node::run_qa(qa, env, &mut state, hbs).await?;
171 } else {
172 qa_node::run_qa_offline(qa, env, &mut state).await?;
173 }
174 }
175
176 if let Some(tool) = &node.tool {
177 let input = tool_node::render_tool_input(tool, env, &state)?;
178 tool_calls.push(ToolCall {
179 tool: tool.tool.clone(),
180 action: tool.action.clone(),
181 input: input.clone(),
182 });
183 payload = match options.tool_mode {
184 ToolMode::Live => {
185 tool_node::run_tool_with_input(tool, input, options.tool_endpoint.as_str())
186 .await?
187 }
188 ToolMode::Stub => tool_node::run_tool_stub_with_input(input)?,
189 };
190 }
191
192 if let Some(tpl) = &node.template {
193 let out = template_node::render_template(tpl, hbs, env, &state, &payload)?;
194 let outmsg = OutMessage {
195 ctx: tenant_ctx.clone(),
196 tenant: env.tenant.clone(),
197 platform: env.platform.clone(),
198 chat_id: env.chat_id.clone(),
199 thread_id: env.thread_id.clone(),
200 kind: OutKind::Text,
201 text: Some(out),
202 message_card: None,
203 adaptive_card: None,
204 meta: env.context.clone(),
205 };
206 let team = tenant_ctx
207 .team
208 .as_ref()
209 .map(|team| team.as_str())
210 .unwrap_or("default");
211 let subject = egress_subject(
212 tenant_ctx.env.as_str(),
213 tenant_ctx.tenant.as_str(),
214 team,
215 env.platform.as_str(),
216 );
217 sink.publish_out_message(&subject, &outmsg).await?;
218 out_messages.push(outmsg);
219 }
220
221 if let Some(card) = &node.card {
222 let card = card_node::render_card(card, hbs, env, &state, &payload)?;
223 let outmsg = OutMessage {
224 ctx: tenant_ctx.clone(),
225 tenant: env.tenant.clone(),
226 platform: env.platform.clone(),
227 chat_id: env.chat_id.clone(),
228 thread_id: env.thread_id.clone(),
229 kind: OutKind::Card,
230 text: None,
231 message_card: Some(card),
232 adaptive_card: None,
233 meta: env.context.clone(),
234 };
235 let team = tenant_ctx
236 .team
237 .as_ref()
238 .map(|team| team.as_str())
239 .unwrap_or("default");
240 let subject = egress_subject(
241 tenant_ctx.env.as_str(),
242 tenant_ctx.tenant.as_str(),
243 team,
244 env.platform.as_str(),
245 );
246 sink.publish_out_message(&subject, &outmsg).await?;
247 out_messages.push(outmsg);
248 }
249
250 if let Some(next) = node.routes.first() {
251 if next == "end" {
252 break;
253 }
254 current = next.clone();
255 continue;
256 }
257 break;
258 }
259
260 let session_data = SessionData {
261 tenant_ctx: tenant_ctx.clone(),
262 flow_id: FlowId::new(flow_id)?,
263 pack_id,
264 cursor: SessionCursor::new(current),
265 context_json: serde_json::to_string(&state)?,
266 };
267
268 if let Some(existing_key) = previous_session {
269 sessions.update_session(&existing_key, session_data).await?;
270 } else if active_user.is_some() {
271 sessions.create_session(tenant_ctx, session_data).await?;
272 } else {
273 tracing::debug!("skipping session persistence; no user context available");
274 }
275
276 Ok(RunnerOutcome {
277 out_messages,
278 tool_calls,
279 state,
280 })
281}
282
283pub fn set_tenant_ctx(ctx: TenantCtx) {
284 set_current_tenant_ctx(ctx);
285}
286
287pub fn env_from_channel(channel: &ChannelMessage) -> Result<MessageEnvelope> {
288 message_from_channel(channel)
289}