1use std::collections::HashMap;
2
3use axum::{
4 extract::{Path, Query, State},
5 http::StatusCode,
6 Json,
7};
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value};
10use uuid::Uuid;
11
12use crate::{
13 AppState, AutomationAgentMcpPolicy, AutomationAgentProfile, AutomationAgentToolPolicy,
14 AutomationExecutionPolicy, AutomationFlowNode, AutomationFlowOutputContract,
15 AutomationFlowSpec, AutomationOutputValidatorKind, AutomationV2Schedule,
16 AutomationV2ScheduleType, AutomationV2Spec, AutomationV2Status, RoutineMisfirePolicy,
17};
18
19const CHANNEL_DRAFT_TTL_MS: u64 = 10 * 60 * 1000;
20const CHANNEL_WORKFLOW_DRAFTING_DISABLED_MESSAGE: &str =
21 "Workflow drafting is disabled for this channel. Enable the workflow planner gate in Settings to continue.";
22const STRICT_KB_FACTUAL_DRAFT_BLOCKED_MESSAGE: &str =
23 "I do not see that in the connected knowledgebase.";
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
26#[serde(rename_all = "snake_case")]
27pub enum ChannelAutomationDraftStatus {
28 Collecting,
29 PreviewReady,
30 Applied,
31 Cancelled,
32 Expired,
33 Blocked,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize, Default)]
37pub struct ChannelAutomationDraftChannelContext {
38 #[serde(default)]
39 pub source_platform: String,
40 #[serde(default)]
41 pub scope_kind: String,
42 #[serde(default)]
43 pub scope_id: String,
44 #[serde(default)]
45 pub reply_target: String,
46 #[serde(default)]
47 pub sender: String,
48 #[serde(default, skip_serializing_if = "Option::is_none")]
49 pub session_id: Option<String>,
50 #[serde(default, skip_serializing_if = "Option::is_none")]
51 pub thread_key: Option<String>,
52 #[serde(default, skip_serializing_if = "Option::is_none")]
53 pub trigger_source: Option<String>,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct ChannelAutomationDraftQuestion {
58 pub field: String,
59 pub text: String,
60 #[serde(default)]
61 pub options: Vec<String>,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct ChannelAutomationDraftPreview {
66 pub summary: String,
67 pub goal: String,
68 pub schedule_hint: String,
69 pub delivery_target: String,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct ChannelAutomationDraftRecord {
74 pub draft_id: String,
75 pub status: ChannelAutomationDraftStatus,
76 pub original_text: String,
77 #[serde(default, skip_serializing_if = "Option::is_none")]
78 pub goal: Option<String>,
79 #[serde(default, skip_serializing_if = "Option::is_none")]
80 pub schedule_hint: Option<String>,
81 #[serde(default, skip_serializing_if = "Option::is_none")]
82 pub delivery_target: Option<String>,
83 #[serde(default)]
84 pub missing_fields: Vec<String>,
85 #[serde(default, skip_serializing_if = "Option::is_none")]
86 pub question: Option<ChannelAutomationDraftQuestion>,
87 #[serde(default, skip_serializing_if = "Option::is_none")]
88 pub preview: Option<ChannelAutomationDraftPreview>,
89 #[serde(default, skip_serializing_if = "Option::is_none")]
90 pub automation_id: Option<String>,
91 #[serde(default)]
92 pub allowed_tools: Vec<String>,
93 #[serde(default)]
94 pub allowed_mcp_servers: Vec<String>,
95 #[serde(default)]
96 pub allowed_mcp_tools: Vec<String>,
97 #[serde(default, skip_serializing_if = "Option::is_none")]
98 pub security_profile: Option<String>,
99 #[serde(default, skip_serializing_if = "Option::is_none")]
100 pub workflow_planner_enabled: Option<bool>,
101 #[serde(default, skip_serializing_if = "Option::is_none")]
102 pub strict_kb_grounding: Option<bool>,
103 #[serde(default, skip_serializing_if = "Option::is_none")]
104 pub factual_question: Option<bool>,
105 #[serde(default, skip_serializing_if = "Option::is_none")]
106 pub explicit_workflow_intent: Option<bool>,
107 pub channel_context: ChannelAutomationDraftChannelContext,
108 pub created_at_ms: u64,
109 pub updated_at_ms: u64,
110 pub expires_at_ms: u64,
111}
112
113#[derive(Debug, Deserialize)]
114pub struct ChannelAutomationDraftStartRequest {
115 pub text: String,
116 #[serde(default)]
117 pub session_id: Option<String>,
118 #[serde(default)]
119 pub thread_key: Option<String>,
120 #[serde(default)]
121 pub channel_context: ChannelAutomationDraftChannelContext,
122 #[serde(default)]
123 pub allowed_tools: Vec<String>,
124 #[serde(default)]
125 pub allowed_mcp_servers: Vec<String>,
126 #[serde(default)]
127 pub allowed_mcp_tools: Vec<String>,
128 #[serde(default)]
129 pub security_profile: Option<String>,
130 #[serde(default)]
131 pub workflow_planner_enabled: Option<bool>,
132 #[serde(default)]
133 pub strict_kb_grounding: Option<bool>,
134 #[serde(default)]
135 pub factual_question: Option<bool>,
136 #[serde(default)]
137 pub explicit_workflow_intent: Option<bool>,
138}
139
140#[derive(Debug, Deserialize)]
141pub struct ChannelAutomationDraftAnswerRequest {
142 pub answer: String,
143 #[serde(default)]
144 pub workflow_planner_enabled: Option<bool>,
145 #[serde(default)]
146 pub strict_kb_grounding: Option<bool>,
147 #[serde(default)]
148 pub factual_question: Option<bool>,
149 #[serde(default)]
150 pub explicit_workflow_intent: Option<bool>,
151}
152
153#[derive(Debug, Deserialize, Default)]
154pub struct ChannelAutomationDraftPendingQuery {
155 pub channel: Option<String>,
156 pub scope_id: Option<String>,
157 pub sender: Option<String>,
158}
159
160impl AppState {
161 pub async fn load_channel_automation_drafts(&self) -> anyhow::Result<()> {
162 if !self.channel_automation_drafts_path.exists() {
163 return Ok(());
164 }
165 let raw = tokio::fs::read_to_string(&self.channel_automation_drafts_path).await?;
166 let parsed = serde_json::from_str::<HashMap<String, ChannelAutomationDraftRecord>>(&raw)
167 .unwrap_or_default();
168 *self.channel_automation_drafts.write().await = parsed;
169 Ok(())
170 }
171
172 pub async fn persist_channel_automation_drafts(&self) -> anyhow::Result<()> {
173 let payload = {
174 let guard = self.channel_automation_drafts.read().await;
175 serde_json::to_string_pretty(&*guard)?
176 };
177 if let Some(parent) = self.channel_automation_drafts_path.parent() {
178 tokio::fs::create_dir_all(parent).await?;
179 }
180 tokio::fs::write(&self.channel_automation_drafts_path, payload).await?;
181 Ok(())
182 }
183}
184
185pub(super) async fn channel_automation_drafts_start(
186 State(state): State<AppState>,
187 Json(input): Json<ChannelAutomationDraftStartRequest>,
188) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
189 let now = crate::now_ms();
190 let text = input.text.trim().to_string();
191 if text.is_empty() {
192 return Err(bad_request("draft text is required"));
193 }
194
195 let mut context = input.channel_context;
196 if context.session_id.is_none() {
197 context.session_id = input.session_id;
198 }
199 if context.thread_key.is_none() {
200 context.thread_key = input.thread_key;
201 }
202 let strict_kb_grounding = input
203 .strict_kb_grounding
204 .unwrap_or(channel_strict_kb_grounding(&state, &context.source_platform).await);
205 let factual_question = input
206 .factual_question
207 .unwrap_or_else(|| channel_draft_message_is_factual_question(&text));
208 let explicit_workflow_intent = input
209 .explicit_workflow_intent
210 .unwrap_or_else(|| channel_draft_message_has_explicit_workflow_intent(&text));
211 let mut draft = ChannelAutomationDraftRecord {
212 draft_id: format!("channel-draft-{}", Uuid::new_v4()),
213 status: ChannelAutomationDraftStatus::Collecting,
214 original_text: text.clone(),
215 goal: infer_goal(&text),
216 schedule_hint: infer_schedule_hint(&text),
217 delivery_target: Some(infer_delivery_target(&text)),
218 missing_fields: Vec::new(),
219 question: None,
220 preview: None,
221 automation_id: None,
222 allowed_tools: normalize_list(input.allowed_tools),
223 allowed_mcp_servers: normalize_list(input.allowed_mcp_servers),
224 allowed_mcp_tools: normalize_list(input.allowed_mcp_tools),
225 security_profile: input.security_profile,
226 workflow_planner_enabled: input.workflow_planner_enabled,
227 strict_kb_grounding: Some(strict_kb_grounding),
228 factual_question: Some(factual_question),
229 explicit_workflow_intent: Some(explicit_workflow_intent),
230 channel_context: context,
231 created_at_ms: now,
232 updated_at_ms: now,
233 expires_at_ms: now.saturating_add(CHANNEL_DRAFT_TTL_MS),
234 };
235 if let Some(payload) = producer_guarded_response(
236 &state,
237 &mut draft,
238 None,
239 ProducerCaller::Start,
240 strict_kb_grounding,
241 factual_question,
242 explicit_workflow_intent,
243 )
244 .await
245 {
246 return Ok(Json(payload));
247 }
248 advance_draft(&mut draft, now);
249 state
250 .channel_automation_drafts
251 .write()
252 .await
253 .insert(draft.draft_id.clone(), draft.clone());
254 persist_channel_drafts_or_log(&state).await;
255 Ok(Json(draft_response(&draft)))
256}
257
258pub(super) async fn channel_automation_drafts_answer(
259 State(state): State<AppState>,
260 Path(draft_id): Path<String>,
261 Json(input): Json<ChannelAutomationDraftAnswerRequest>,
262) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
263 let now = crate::now_ms();
264 let mut draft = get_mutable_draft(&state, &draft_id).await?;
265 ensure_open_draft(&draft, now)?;
266 let answer = input.answer.trim().to_string();
267 if answer.is_empty() {
268 return Err(bad_request("answer is required"));
269 }
270 let strict_kb_grounding = input
271 .strict_kb_grounding
272 .or(draft.strict_kb_grounding)
273 .unwrap_or(
274 channel_strict_kb_grounding(&state, &draft.channel_context.source_platform).await,
275 );
276 let factual_question = input
277 .factual_question
278 .unwrap_or_else(|| channel_draft_message_is_factual_question(&answer));
279 let explicit_workflow_intent = input
280 .explicit_workflow_intent
281 .unwrap_or_else(|| channel_draft_message_has_explicit_workflow_intent(&answer));
282 draft.workflow_planner_enabled = input
283 .workflow_planner_enabled
284 .or(draft.workflow_planner_enabled);
285 draft.strict_kb_grounding = Some(strict_kb_grounding);
286 draft.factual_question = Some(factual_question);
287 draft.explicit_workflow_intent = Some(explicit_workflow_intent);
288 if let Some(payload) = producer_guarded_response(
289 &state,
290 &mut draft,
291 Some(draft_id.as_str()),
292 ProducerCaller::Answer,
293 strict_kb_grounding,
294 factual_question,
295 explicit_workflow_intent,
296 )
297 .await
298 {
299 return Ok(Json(payload));
300 }
301 if is_cancel_text(&answer) {
302 draft.status = ChannelAutomationDraftStatus::Cancelled;
303 draft.question = None;
304 draft.updated_at_ms = now;
305 store_draft(&state, draft.clone()).await;
306 return Ok(Json(draft_response(&draft)));
307 }
308 match draft
309 .question
310 .as_ref()
311 .map(|question| question.field.as_str())
312 {
313 Some("goal") => {
314 if draft.schedule_hint.is_none() {
315 draft.schedule_hint = infer_schedule_hint(&answer);
316 }
317 draft.goal = Some(answer);
318 }
319 Some("schedule_hint") => draft.schedule_hint = Some(answer),
320 Some("delivery_target") => draft.delivery_target = Some(answer),
321 _ if draft.status == ChannelAutomationDraftStatus::PreviewReady => {}
322 _ => draft.goal = Some(answer),
323 }
324 advance_draft(&mut draft, now);
325 store_draft(&state, draft.clone()).await;
326 Ok(Json(draft_response(&draft)))
327}
328
329pub(super) async fn channel_automation_drafts_confirm(
330 State(state): State<AppState>,
331 Path(draft_id): Path<String>,
332) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
333 let now = crate::now_ms();
334 let mut draft = get_mutable_draft(&state, &draft_id).await?;
335 ensure_open_draft(&draft, now)?;
336 if draft.status != ChannelAutomationDraftStatus::PreviewReady {
337 return Err((
338 StatusCode::CONFLICT,
339 Json(json!({
340 "error": "Draft is not ready for confirmation",
341 "code": "CHANNEL_AUTOMATION_DRAFT_NOT_READY",
342 "draft": draft,
343 })),
344 ));
345 }
346 let automation = build_channel_automation(&draft, now);
347 let stored = state.put_automation_v2(automation).await.map_err(|error| {
348 (
349 StatusCode::BAD_REQUEST,
350 Json(json!({
351 "error": error.to_string(),
352 "code": "CHANNEL_AUTOMATION_CREATE_FAILED",
353 })),
354 )
355 })?;
356 draft.status = ChannelAutomationDraftStatus::Applied;
357 draft.automation_id = Some(stored.automation_id.clone());
358 draft.question = None;
359 draft.updated_at_ms = now;
360 store_draft(&state, draft.clone()).await;
361 let mut payload = draft_response(&draft);
362 if let Some(obj) = payload.as_object_mut() {
363 obj.insert("automation".to_string(), json!(stored));
364 }
365 Ok(Json(payload))
366}
367
368pub(super) async fn channel_automation_drafts_cancel(
369 State(state): State<AppState>,
370 Path(draft_id): Path<String>,
371) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
372 let now = crate::now_ms();
373 let mut draft = get_mutable_draft(&state, &draft_id).await?;
374 draft.status = ChannelAutomationDraftStatus::Cancelled;
375 draft.question = None;
376 draft.updated_at_ms = now;
377 store_draft(&state, draft.clone()).await;
378 Ok(Json(draft_response(&draft)))
379}
380
381pub(super) async fn channel_automation_drafts_pending(
382 State(state): State<AppState>,
383 Query(query): Query<ChannelAutomationDraftPendingQuery>,
384) -> Json<Value> {
385 let now = crate::now_ms();
386 let drafts = state
387 .channel_automation_drafts
388 .read()
389 .await
390 .values()
391 .filter(|draft| draft.expires_at_ms > now)
392 .filter(|draft| {
393 query
394 .channel
395 .as_deref()
396 .map(|value| draft.channel_context.source_platform == value)
397 .unwrap_or(true)
398 })
399 .filter(|draft| {
400 query
401 .scope_id
402 .as_deref()
403 .map(|value| draft.channel_context.scope_id == value)
404 .unwrap_or(true)
405 })
406 .filter(|draft| {
407 query
408 .sender
409 .as_deref()
410 .map(|value| draft.channel_context.sender == value)
411 .unwrap_or(true)
412 })
413 .filter(|draft| {
414 matches!(
415 draft.status,
416 ChannelAutomationDraftStatus::Collecting
417 | ChannelAutomationDraftStatus::PreviewReady
418 | ChannelAutomationDraftStatus::Blocked
419 )
420 })
421 .cloned()
422 .collect::<Vec<_>>();
423 let count = drafts.len();
424 Json(json!({
425 "drafts": drafts,
426 "count": count,
427 }))
428}
429
430async fn get_mutable_draft(
431 state: &AppState,
432 draft_id: &str,
433) -> Result<ChannelAutomationDraftRecord, (StatusCode, Json<Value>)> {
434 state
435 .channel_automation_drafts
436 .read()
437 .await
438 .get(draft_id)
439 .cloned()
440 .ok_or_else(|| {
441 (
442 StatusCode::NOT_FOUND,
443 Json(json!({
444 "error": "Channel automation draft not found",
445 "code": "CHANNEL_AUTOMATION_DRAFT_NOT_FOUND",
446 "draft_id": draft_id,
447 })),
448 )
449 })
450}
451
452async fn store_draft(state: &AppState, draft: ChannelAutomationDraftRecord) {
453 state
454 .channel_automation_drafts
455 .write()
456 .await
457 .insert(draft.draft_id.clone(), draft);
458 persist_channel_drafts_or_log(state).await;
459}
460
461async fn persist_channel_drafts_or_log(state: &AppState) {
462 if let Err(error) = state.persist_channel_automation_drafts().await {
463 tracing::warn!("failed to persist channel automation drafts: {error}");
464 }
465}
466
467fn ensure_open_draft(
468 draft: &ChannelAutomationDraftRecord,
469 now: u64,
470) -> Result<(), (StatusCode, Json<Value>)> {
471 if draft.expires_at_ms <= now {
472 return Err((
473 StatusCode::GONE,
474 Json(json!({
475 "error": "Channel automation draft expired",
476 "code": "CHANNEL_AUTOMATION_DRAFT_EXPIRED",
477 "draft_id": draft.draft_id,
478 })),
479 ));
480 }
481 match draft.status {
482 ChannelAutomationDraftStatus::Cancelled
483 | ChannelAutomationDraftStatus::Expired
484 | ChannelAutomationDraftStatus::Applied => Err((
485 StatusCode::CONFLICT,
486 Json(json!({
487 "error": "Channel automation draft is closed",
488 "code": "CHANNEL_AUTOMATION_DRAFT_CLOSED",
489 "draft_id": draft.draft_id,
490 "status": draft.status.clone(),
491 })),
492 )),
493 ChannelAutomationDraftStatus::Collecting
494 | ChannelAutomationDraftStatus::PreviewReady
495 | ChannelAutomationDraftStatus::Blocked => Ok(()),
496 }
497}
498
499fn advance_draft(draft: &mut ChannelAutomationDraftRecord, now: u64) {
500 draft.updated_at_ms = now;
501 draft.expires_at_ms = now.saturating_add(CHANNEL_DRAFT_TTL_MS);
502 draft.missing_fields.clear();
503 draft.question = None;
504 draft.preview = None;
505
506 if draft
507 .goal
508 .as_deref()
509 .map(str::trim)
510 .unwrap_or("")
511 .is_empty()
512 {
513 draft.status = ChannelAutomationDraftStatus::Collecting;
514 draft.missing_fields.push("goal".to_string());
515 draft.question = Some(ChannelAutomationDraftQuestion {
516 field: "goal".to_string(),
517 text: "What should this automation do?".to_string(),
518 options: Vec::new(),
519 });
520 return;
521 }
522 if draft
523 .schedule_hint
524 .as_deref()
525 .map(str::trim)
526 .unwrap_or("")
527 .is_empty()
528 {
529 draft.status = ChannelAutomationDraftStatus::Collecting;
530 draft.missing_fields.push("schedule_hint".to_string());
531 draft.question = Some(ChannelAutomationDraftQuestion {
532 field: "schedule_hint".to_string(),
533 text: "When should it run, or what event should trigger it?".to_string(),
534 options: vec![
535 "daily".to_string(),
536 "weekly".to_string(),
537 "when something changes".to_string(),
538 ],
539 });
540 return;
541 }
542
543 let goal = draft.goal.clone().unwrap_or_default();
544 let schedule_hint = draft.schedule_hint.clone().unwrap_or_default();
545 let delivery_target = draft
546 .delivery_target
547 .clone()
548 .unwrap_or_else(|| "same_chat".to_string());
549 draft.status = ChannelAutomationDraftStatus::PreviewReady;
550 draft.preview = Some(ChannelAutomationDraftPreview {
551 summary: format!(
552 "Run `{}` on `{}` and report to `{}`.",
553 truncate_for_label(&goal, 80),
554 schedule_hint,
555 delivery_target
556 ),
557 goal,
558 schedule_hint,
559 delivery_target,
560 });
561}
562
563fn draft_response(draft: &ChannelAutomationDraftRecord) -> Value {
564 let message = format_channel_draft_message(draft);
565 log_channel_automation_draft_response_producer(
566 draft,
567 None,
568 draft.workflow_planner_enabled,
569 draft.strict_kb_grounding.unwrap_or(false),
570 draft.factual_question.unwrap_or(false),
571 draft.explicit_workflow_intent.unwrap_or(false),
572 false,
573 true,
574 "draft_response",
575 );
576 json!({
577 "draft": draft,
578 "message": message,
579 })
580}
581
582#[derive(Debug, Clone, Copy)]
583enum ProducerCaller {
584 Start,
585 Answer,
586}
587
588impl ProducerCaller {
589 fn label(self) -> &'static str {
590 match self {
591 Self::Start => "channel_automation_drafts_start",
592 Self::Answer => "channel_automation_drafts_answer",
593 }
594 }
595}
596
597async fn producer_guarded_response(
598 state: &AppState,
599 draft: &mut ChannelAutomationDraftRecord,
600 pending_draft_id: Option<&str>,
601 caller: ProducerCaller,
602 strict_kb_grounding: bool,
603 factual_question: bool,
604 explicit_workflow_intent: bool,
605) -> Option<Value> {
606 let workflow_planner_enabled = draft.workflow_planner_enabled.unwrap_or(true);
607 let block_reason = if !workflow_planner_enabled {
608 Some((
609 "workflow_drafting_disabled",
610 CHANNEL_WORKFLOW_DRAFTING_DISABLED_MESSAGE,
611 ))
612 } else if strict_kb_grounding && factual_question && !explicit_workflow_intent {
613 Some((
614 "strict_kb_factual_question",
615 STRICT_KB_FACTUAL_DRAFT_BLOCKED_MESSAGE,
616 ))
617 } else {
618 None
619 };
620
621 let Some((reason, message)) = block_reason else {
622 log_channel_automation_draft_response_producer(
623 draft,
624 pending_draft_id,
625 Some(workflow_planner_enabled),
626 strict_kb_grounding,
627 factual_question,
628 explicit_workflow_intent,
629 false,
630 false,
631 caller.label(),
632 );
633 return None;
634 };
635
636 draft.status = ChannelAutomationDraftStatus::Cancelled;
637 draft.goal = None;
638 draft.schedule_hint = None;
639 draft.delivery_target = None;
640 draft.missing_fields.clear();
641 draft.question = None;
642 draft.preview = None;
643 draft.updated_at_ms = crate::now_ms();
644 draft.workflow_planner_enabled = Some(workflow_planner_enabled);
645 draft.strict_kb_grounding = Some(strict_kb_grounding);
646 draft.factual_question = Some(factual_question);
647 draft.explicit_workflow_intent = Some(explicit_workflow_intent);
648 store_draft(state, draft.clone()).await;
649 log_channel_automation_draft_response_producer(
650 draft,
651 pending_draft_id,
652 Some(workflow_planner_enabled),
653 strict_kb_grounding,
654 factual_question,
655 explicit_workflow_intent,
656 true,
657 false,
658 reason,
659 );
660 Some(json!({
661 "draft": draft,
662 "message": message,
663 "blocked": true,
664 "block_reason": reason,
665 }))
666}
667
668async fn channel_strict_kb_grounding(state: &AppState, channel: &str) -> bool {
669 let channel = channel.trim().to_ascii_lowercase();
670 if channel.is_empty() {
671 return false;
672 }
673 state
674 .config
675 .get_effective_value()
676 .await
677 .get("channels")
678 .and_then(Value::as_object)
679 .and_then(|channels| channels.get(&channel))
680 .and_then(Value::as_object)
681 .and_then(|cfg| cfg.get("strict_kb_grounding"))
682 .and_then(Value::as_bool)
683 .unwrap_or(false)
684}
685
686fn channel_draft_message_is_factual_question(message: &str) -> bool {
687 let text = message.trim().to_ascii_lowercase();
688 if text.is_empty() || text.starts_with('/') || !text.contains('?') {
689 return false;
690 }
691 [
692 "what ", "who ", "where ", "when ", "which ", "can ", "could ", "does ", "do ", "is ",
693 "are ", "how ", "tell me ", "explain ",
694 ]
695 .iter()
696 .any(|starter| text.starts_with(starter))
697}
698
699fn channel_draft_message_has_explicit_workflow_intent(message: &str) -> bool {
700 let text = message.trim().to_ascii_lowercase();
701 if text.is_empty() || text.starts_with('/') {
702 return false;
703 }
704 let contains_explicit_phrase = [
705 "create a workflow",
706 "create workflow",
707 "build a workflow",
708 "build workflow",
709 "set up a workflow",
710 "setup a workflow",
711 "make a workflow",
712 "create an automation",
713 "create automation",
714 "build an automation",
715 "build automation",
716 "schedule a workflow",
717 "schedule a daily report",
718 "schedule a report",
719 "schedule a reminder",
720 "set up a bot",
721 "setup a bot",
722 "make a bot that runs",
723 "create a bot that runs",
724 ]
725 .iter()
726 .any(|phrase| text.contains(phrase));
727 let contains_workflow_target = ["workflow", "automation", "automations", "bot", "reminder"]
728 .iter()
729 .any(|word| text.contains(word));
730 let contains_authoring_verb = [
731 "create", "build", "make", "draft", "schedule", "automate", "set up", "setup",
732 ]
733 .iter()
734 .any(|word| text.contains(word));
735 let monitoring_request = text.starts_with("monitor ")
736 && [
737 " every ",
738 " each ",
739 "daily",
740 "weekly",
741 "hourly",
742 "every morning",
743 ]
744 .iter()
745 .any(|word| text.contains(word));
746 contains_explicit_phrase
747 || (contains_workflow_target && contains_authoring_verb)
748 || monitoring_request
749}
750
751#[allow(clippy::too_many_arguments)]
752fn log_channel_automation_draft_response_producer(
753 draft: &ChannelAutomationDraftRecord,
754 pending_draft_id: Option<&str>,
755 workflow_planner_enabled: Option<bool>,
756 strict_kb_grounding: bool,
757 factual_question: bool,
758 explicit_workflow_intent: bool,
759 blocked: bool,
760 emitted: bool,
761 reason: &str,
762) {
763 tracing::warn!(
764 prefix = "CHANNEL_AUTOMATION_DRAFT_RESPONSE_PRODUCER",
765 channel = %draft.channel_context.source_platform,
766 platform = %draft.channel_context.source_platform,
767 session_id = ?draft.channel_context.session_id,
768 scope_id = %draft.channel_context.scope_id,
769 draft_id = %draft.draft_id,
770 pending_draft_id = ?pending_draft_id,
771 workflow_planner_enabled = ?workflow_planner_enabled,
772 strict_kb_grounding,
773 factual_question,
774 explicit_workflow_intent,
775 blocked,
776 emitted,
777 reason,
778 "CHANNEL_AUTOMATION_DRAFT_RESPONSE_PRODUCER"
779 );
780}
781
782fn format_channel_draft_message(draft: &ChannelAutomationDraftRecord) -> String {
783 match draft.status {
784 ChannelAutomationDraftStatus::Collecting => {
785 let question = draft
786 .question
787 .as_ref()
788 .map(|question| {
789 let mut lines = vec![question.text.clone()];
790 for (index, option) in question.options.iter().enumerate() {
791 lines.push(format!("{}. {}", index + 1, option));
792 }
793 lines.join("\n")
794 })
795 .unwrap_or_else(|| {
796 "I need one more detail before I can draft this automation.".to_string()
797 });
798 format!("{question}\nReply here with the answer, or reply `cancel` to stop.")
799 }
800 ChannelAutomationDraftStatus::PreviewReady => {
801 let preview = draft.preview.as_ref();
802 let summary = preview
803 .map(|value| value.summary.clone())
804 .unwrap_or_else(|| "Automation draft is ready.".to_string());
805 format!("{summary}\nReply `confirm` to create it, or `cancel` to stop.")
806 }
807 ChannelAutomationDraftStatus::Applied => {
808 let id = draft.automation_id.as_deref().unwrap_or("unknown");
809 format!("Automation created: `{id}`")
810 }
811 ChannelAutomationDraftStatus::Cancelled => "Automation draft cancelled.".to_string(),
812 ChannelAutomationDraftStatus::Expired => "Automation draft expired.".to_string(),
813 ChannelAutomationDraftStatus::Blocked => {
814 "Automation draft is blocked until required channel setup is completed.".to_string()
815 }
816 }
817}
818
819fn build_channel_automation(draft: &ChannelAutomationDraftRecord, now: u64) -> AutomationV2Spec {
820 let goal = draft
821 .goal
822 .clone()
823 .unwrap_or_else(|| draft.original_text.clone());
824 let schedule_hint = draft.schedule_hint.clone().unwrap_or_default();
825 let automation_id = format!("automation-v2-{}", Uuid::new_v4());
826 let agent_id = format!("agent-{}", Uuid::new_v4());
827 let output_target = format!(
828 "channel:{}:{}",
829 empty_as_unknown(&draft.channel_context.source_platform),
830 empty_as_unknown(&draft.channel_context.scope_id)
831 );
832 let metadata = json!({
833 "created_from": "channel_automation_draft",
834 "draft_id": draft.draft_id,
835 "channel_context": draft.channel_context,
836 "schedule_hint": schedule_hint,
837 "delivery_target": draft.delivery_target,
838 "security_profile": draft.security_profile,
839 "allowed_tools": draft.allowed_tools,
840 "allowed_mcp_servers": draft.allowed_mcp_servers,
841 "allowed_mcp_tools": draft.allowed_mcp_tools,
842 "confirmation_required": true,
843 "confirmed_at_ms": now,
844 });
845 AutomationV2Spec {
846 automation_id,
847 name: automation_name_from_goal(&goal),
848 description: Some(format!("Created from channel chat: {goal}")),
849 status: AutomationV2Status::Active,
850 schedule: schedule_from_hint(&schedule_hint),
851 knowledge: tandem_orchestrator::KnowledgeBinding::default(),
852 agents: vec![AutomationAgentProfile {
853 agent_id: agent_id.clone(),
854 template_id: None,
855 display_name: "Channel Automation Agent".to_string(),
856 avatar_url: None,
857 model_policy: None,
858 skills: Vec::new(),
859 tool_policy: AutomationAgentToolPolicy {
860 allowlist: draft.allowed_tools.clone(),
861 denylist: Vec::new(),
862 },
863 mcp_policy: AutomationAgentMcpPolicy {
864 allowed_servers: draft.allowed_mcp_servers.clone(),
865 allowed_tools: if draft.allowed_mcp_tools.is_empty() {
866 None
867 } else {
868 Some(draft.allowed_mcp_tools.clone())
869 },
870 },
871 approval_policy: None,
872 }],
873 flow: AutomationFlowSpec {
874 nodes: vec![AutomationFlowNode {
875 node_id: "channel_automation_task".to_string(),
876 agent_id,
877 objective: format!(
878 "{goal}\n\nReport results back to the originating chat context."
879 ),
880 knowledge: tandem_orchestrator::KnowledgeBinding::default(),
881 depends_on: Vec::new(),
882 input_refs: Vec::new(),
883 output_contract: Some(AutomationFlowOutputContract {
884 kind: "channel_response".to_string(),
885 validator: Some(AutomationOutputValidatorKind::GenericArtifact),
886 enforcement: None,
887 schema: None,
888 summary_guidance: Some(
889 "Summarize the outcome in channel-safe language.".to_string(),
890 ),
891 }),
892 retry_policy: None,
893 timeout_ms: None,
894 max_tool_calls: Some(16),
895 stage_kind: None,
896 gate: None,
897 metadata: Some(json!({
898 "created_from": "channel_automation_draft",
899 "source_platform": draft.channel_context.source_platform,
900 "source_scope_id": draft.channel_context.scope_id,
901 })),
902 }],
903 },
904 execution: AutomationExecutionPolicy {
905 max_parallel_agents: Some(1),
906 max_total_runtime_ms: Some(15 * 60 * 1000),
907 max_total_tool_calls: Some(24),
908 max_total_tokens: None,
909 max_total_cost_usd: None,
910 },
911 output_targets: vec![output_target],
912 created_at_ms: now,
913 updated_at_ms: now,
914 creator_id: if draft.channel_context.sender.trim().is_empty() {
915 "channel".to_string()
916 } else {
917 draft.channel_context.sender.clone()
918 },
919 workspace_root: None,
920 metadata: Some(metadata),
921 next_fire_at_ms: None,
922 last_fired_at_ms: None,
923 scope_policy: None,
924 watch_conditions: Vec::new(),
925 handoff_config: None,
926 }
927}
928
929fn schedule_from_hint(hint: &str) -> AutomationV2Schedule {
930 let lower = hint.to_ascii_lowercase();
931 if lower.contains("daily") || lower.contains("every day") {
932 if let Some((hour, minute)) = parse_daily_time_hint(&lower) {
933 return AutomationV2Schedule {
934 schedule_type: AutomationV2ScheduleType::Cron,
935 cron_expression: Some(format!("0 {minute} {hour} * * * *")),
936 interval_seconds: None,
937 timezone: "UTC".to_string(),
938 misfire_policy: RoutineMisfirePolicy::RunOnce,
939 };
940 }
941 }
942 let interval_seconds = if lower.contains("hourly") || lower.contains("every hour") {
943 Some(60 * 60)
944 } else if lower.contains("weekly") || lower.contains("every week") {
945 Some(7 * 24 * 60 * 60)
946 } else if lower.contains("daily") || lower.contains("every day") {
947 Some(24 * 60 * 60)
948 } else {
949 None
950 };
951 AutomationV2Schedule {
952 schedule_type: interval_seconds
953 .map(|_| AutomationV2ScheduleType::Interval)
954 .unwrap_or(AutomationV2ScheduleType::Manual),
955 cron_expression: None,
956 interval_seconds,
957 timezone: "UTC".to_string(),
958 misfire_policy: RoutineMisfirePolicy::RunOnce,
959 }
960}
961
962fn parse_daily_time_hint(lower: &str) -> Option<(u32, u32)> {
963 let marker = lower
964 .find(" at ")
965 .map(|index| index + 4)
966 .or_else(|| lower.find(" around ").map(|index| index + 8))?;
967 let tail = lower.get(marker..)?.trim();
968 let token = tail
969 .split_whitespace()
970 .next()
971 .unwrap_or("")
972 .trim_matches(|ch: char| ch == '.' || ch == ',');
973 if token.is_empty() {
974 return None;
975 }
976 let is_pm = token.ends_with("pm");
977 let is_am = token.ends_with("am");
978 let numeric = token.trim_end_matches("am").trim_end_matches("pm");
979 let mut pieces = numeric.split(':');
980 let mut hour = pieces.next()?.parse::<u32>().ok()?;
981 let minute = pieces
982 .next()
983 .map(|value| value.parse::<u32>().ok())
984 .unwrap_or(Some(0))?;
985 if hour > 23 || minute > 59 {
986 return None;
987 }
988 if is_pm && hour < 12 {
989 hour += 12;
990 } else if is_am && hour == 12 {
991 hour = 0;
992 }
993 Some((hour, minute))
994}
995
996fn infer_goal(text: &str) -> Option<String> {
997 let mut candidate = text.trim();
998 let lower = candidate.to_ascii_lowercase();
999 let generic = [
1000 "create an automation",
1001 "create automation",
1002 "make an automation",
1003 "set up an automation",
1004 "setup an automation",
1005 "automate this",
1006 ];
1007 if generic.iter().any(|value| lower == *value) {
1008 return None;
1009 }
1010 let prefixes = [
1011 "create an automation that",
1012 "create automation that",
1013 "make an automation that",
1014 "set up an automation that",
1015 "setup an automation that",
1016 "automate",
1017 "please automate",
1018 ];
1019 for prefix in prefixes {
1020 if lower.starts_with(prefix) {
1021 candidate = candidate.get(prefix.len()..).unwrap_or(candidate).trim();
1022 break;
1023 }
1024 }
1025 if candidate.chars().count() < 8 {
1026 None
1027 } else {
1028 Some(candidate.to_string())
1029 }
1030}
1031
1032fn infer_schedule_hint(text: &str) -> Option<String> {
1033 let lower = text.to_ascii_lowercase();
1034 if lower.contains("hourly") || lower.contains("every hour") {
1035 return Some("hourly".to_string());
1036 }
1037 if lower.contains("weekly") || lower.contains("every week") {
1038 return Some("weekly".to_string());
1039 }
1040 if lower.contains("daily") || lower.contains("every day") || lower.contains("each day") {
1041 return Some(extract_daily_hint(text));
1042 }
1043 if lower.starts_with("when ")
1044 || lower.contains(" whenever ")
1045 || lower.contains(" when ")
1046 || lower.contains(" on new ")
1047 || lower.contains(" new issue")
1048 {
1049 return Some("event-driven".to_string());
1050 }
1051 None
1052}
1053
1054fn extract_daily_hint(text: &str) -> String {
1055 let lower = text.to_ascii_lowercase();
1056 for marker in [" at ", " around "] {
1057 if let Some(index) = lower.find(marker) {
1058 let time = text[index + marker.len()..]
1059 .split_whitespace()
1060 .take(2)
1061 .collect::<Vec<_>>()
1062 .join(" ");
1063 if !time.trim().is_empty() {
1064 return format!(
1065 "daily at {}",
1066 time.trim_matches(|ch: char| ch == '.' || ch == ',')
1067 );
1068 }
1069 }
1070 }
1071 "daily".to_string()
1072}
1073
1074fn infer_delivery_target(text: &str) -> String {
1075 let lower = text.to_ascii_lowercase();
1076 if lower.contains(" here") || lower.contains("this channel") || lower.contains("this chat") {
1077 "same_chat".to_string()
1078 } else {
1079 "same_chat".to_string()
1080 }
1081}
1082
1083fn automation_name_from_goal(goal: &str) -> String {
1084 let label = truncate_for_label(goal.trim(), 56);
1085 if label.is_empty() {
1086 "Channel automation".to_string()
1087 } else {
1088 label
1089 }
1090}
1091
1092fn truncate_for_label(value: &str, max_chars: usize) -> String {
1093 if value.chars().count() <= max_chars {
1094 return value.to_string();
1095 }
1096 let mut clipped = value
1097 .chars()
1098 .take(max_chars.saturating_sub(3))
1099 .collect::<String>();
1100 clipped.push_str("...");
1101 clipped
1102}
1103
1104fn normalize_list(values: Vec<String>) -> Vec<String> {
1105 let mut normalized = values
1106 .into_iter()
1107 .map(|value| value.trim().to_string())
1108 .filter(|value| !value.is_empty())
1109 .collect::<Vec<_>>();
1110 normalized.sort();
1111 normalized.dedup();
1112 normalized
1113}
1114
1115fn empty_as_unknown(value: &str) -> &str {
1116 if value.trim().is_empty() {
1117 "unknown"
1118 } else {
1119 value
1120 }
1121}
1122
1123fn is_cancel_text(value: &str) -> bool {
1124 matches!(
1125 value.trim().to_ascii_lowercase().as_str(),
1126 "cancel" | "stop" | "abort" | "never mind" | "nevermind"
1127 )
1128}
1129
1130fn bad_request(detail: &str) -> (StatusCode, Json<Value>) {
1131 (
1132 StatusCode::BAD_REQUEST,
1133 Json(json!({
1134 "error": "Invalid channel automation draft request",
1135 "code": "CHANNEL_AUTOMATION_DRAFT_INVALID",
1136 "detail": detail,
1137 })),
1138 )
1139}