Skip to main content

tandem_server/http/
channel_automation_drafts.rs

1use std::collections::HashMap;
2
3use axum::{
4    extract::{Path, Query, State},
5    http::StatusCode,
6    Json,
7};
8use serde::{Deserialize, Serialize};
9use serde_json::{json, Value};
10use uuid::Uuid;
11
12use crate::{
13    AppState, AutomationAgentMcpPolicy, AutomationAgentProfile, AutomationAgentToolPolicy,
14    AutomationExecutionPolicy, AutomationFlowNode, AutomationFlowOutputContract,
15    AutomationFlowSpec, AutomationOutputValidatorKind, AutomationV2Schedule,
16    AutomationV2ScheduleType, AutomationV2Spec, AutomationV2Status, RoutineMisfirePolicy,
17};
18
19const CHANNEL_DRAFT_TTL_MS: u64 = 10 * 60 * 1000;
20const CHANNEL_WORKFLOW_DRAFTING_DISABLED_MESSAGE: &str =
21    "Workflow drafting is disabled for this channel. Enable the workflow planner gate in Settings to continue.";
22const STRICT_KB_FACTUAL_DRAFT_BLOCKED_MESSAGE: &str =
23    "I do not see that in the connected knowledgebase.";
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
26#[serde(rename_all = "snake_case")]
27pub enum ChannelAutomationDraftStatus {
28    Collecting,
29    PreviewReady,
30    Applied,
31    Cancelled,
32    Expired,
33    Blocked,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize, Default)]
37pub struct ChannelAutomationDraftChannelContext {
38    #[serde(default)]
39    pub source_platform: String,
40    #[serde(default)]
41    pub scope_kind: String,
42    #[serde(default)]
43    pub scope_id: String,
44    #[serde(default)]
45    pub reply_target: String,
46    #[serde(default)]
47    pub sender: String,
48    #[serde(default, skip_serializing_if = "Option::is_none")]
49    pub session_id: Option<String>,
50    #[serde(default, skip_serializing_if = "Option::is_none")]
51    pub thread_key: Option<String>,
52    #[serde(default, skip_serializing_if = "Option::is_none")]
53    pub trigger_source: Option<String>,
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct ChannelAutomationDraftQuestion {
58    pub field: String,
59    pub text: String,
60    #[serde(default)]
61    pub options: Vec<String>,
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct ChannelAutomationDraftPreview {
66    pub summary: String,
67    pub goal: String,
68    pub schedule_hint: String,
69    pub delivery_target: String,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct ChannelAutomationDraftRecord {
74    pub draft_id: String,
75    pub status: ChannelAutomationDraftStatus,
76    pub original_text: String,
77    #[serde(default, skip_serializing_if = "Option::is_none")]
78    pub goal: Option<String>,
79    #[serde(default, skip_serializing_if = "Option::is_none")]
80    pub schedule_hint: Option<String>,
81    #[serde(default, skip_serializing_if = "Option::is_none")]
82    pub delivery_target: Option<String>,
83    #[serde(default)]
84    pub missing_fields: Vec<String>,
85    #[serde(default, skip_serializing_if = "Option::is_none")]
86    pub question: Option<ChannelAutomationDraftQuestion>,
87    #[serde(default, skip_serializing_if = "Option::is_none")]
88    pub preview: Option<ChannelAutomationDraftPreview>,
89    #[serde(default, skip_serializing_if = "Option::is_none")]
90    pub automation_id: Option<String>,
91    #[serde(default)]
92    pub allowed_tools: Vec<String>,
93    #[serde(default)]
94    pub allowed_mcp_servers: Vec<String>,
95    #[serde(default)]
96    pub allowed_mcp_tools: Vec<String>,
97    #[serde(default, skip_serializing_if = "Option::is_none")]
98    pub security_profile: Option<String>,
99    #[serde(default, skip_serializing_if = "Option::is_none")]
100    pub workflow_planner_enabled: Option<bool>,
101    #[serde(default, skip_serializing_if = "Option::is_none")]
102    pub strict_kb_grounding: Option<bool>,
103    #[serde(default, skip_serializing_if = "Option::is_none")]
104    pub factual_question: Option<bool>,
105    #[serde(default, skip_serializing_if = "Option::is_none")]
106    pub explicit_workflow_intent: Option<bool>,
107    pub channel_context: ChannelAutomationDraftChannelContext,
108    pub created_at_ms: u64,
109    pub updated_at_ms: u64,
110    pub expires_at_ms: u64,
111}
112
113#[derive(Debug, Deserialize)]
114pub struct ChannelAutomationDraftStartRequest {
115    pub text: String,
116    #[serde(default)]
117    pub session_id: Option<String>,
118    #[serde(default)]
119    pub thread_key: Option<String>,
120    #[serde(default)]
121    pub channel_context: ChannelAutomationDraftChannelContext,
122    #[serde(default)]
123    pub allowed_tools: Vec<String>,
124    #[serde(default)]
125    pub allowed_mcp_servers: Vec<String>,
126    #[serde(default)]
127    pub allowed_mcp_tools: Vec<String>,
128    #[serde(default)]
129    pub security_profile: Option<String>,
130    #[serde(default)]
131    pub workflow_planner_enabled: Option<bool>,
132    #[serde(default)]
133    pub strict_kb_grounding: Option<bool>,
134    #[serde(default)]
135    pub factual_question: Option<bool>,
136    #[serde(default)]
137    pub explicit_workflow_intent: Option<bool>,
138}
139
140#[derive(Debug, Deserialize)]
141pub struct ChannelAutomationDraftAnswerRequest {
142    pub answer: String,
143    #[serde(default)]
144    pub workflow_planner_enabled: Option<bool>,
145    #[serde(default)]
146    pub strict_kb_grounding: Option<bool>,
147    #[serde(default)]
148    pub factual_question: Option<bool>,
149    #[serde(default)]
150    pub explicit_workflow_intent: Option<bool>,
151}
152
153#[derive(Debug, Deserialize, Default)]
154pub struct ChannelAutomationDraftPendingQuery {
155    pub channel: Option<String>,
156    pub scope_id: Option<String>,
157    pub sender: Option<String>,
158}
159
160impl AppState {
161    pub async fn load_channel_automation_drafts(&self) -> anyhow::Result<()> {
162        if !self.channel_automation_drafts_path.exists() {
163            return Ok(());
164        }
165        let raw = tokio::fs::read_to_string(&self.channel_automation_drafts_path).await?;
166        let parsed = serde_json::from_str::<HashMap<String, ChannelAutomationDraftRecord>>(&raw)
167            .unwrap_or_default();
168        *self.channel_automation_drafts.write().await = parsed;
169        Ok(())
170    }
171
172    pub async fn persist_channel_automation_drafts(&self) -> anyhow::Result<()> {
173        let payload = {
174            let guard = self.channel_automation_drafts.read().await;
175            serde_json::to_string_pretty(&*guard)?
176        };
177        if let Some(parent) = self.channel_automation_drafts_path.parent() {
178            tokio::fs::create_dir_all(parent).await?;
179        }
180        tokio::fs::write(&self.channel_automation_drafts_path, payload).await?;
181        Ok(())
182    }
183}
184
185pub(super) async fn channel_automation_drafts_start(
186    State(state): State<AppState>,
187    Json(input): Json<ChannelAutomationDraftStartRequest>,
188) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
189    let now = crate::now_ms();
190    let text = input.text.trim().to_string();
191    if text.is_empty() {
192        return Err(bad_request("draft text is required"));
193    }
194
195    let mut context = input.channel_context;
196    if context.session_id.is_none() {
197        context.session_id = input.session_id;
198    }
199    if context.thread_key.is_none() {
200        context.thread_key = input.thread_key;
201    }
202    let strict_kb_grounding = input
203        .strict_kb_grounding
204        .unwrap_or(channel_strict_kb_grounding(&state, &context.source_platform).await);
205    let factual_question = input
206        .factual_question
207        .unwrap_or_else(|| channel_draft_message_is_factual_question(&text));
208    let explicit_workflow_intent = input
209        .explicit_workflow_intent
210        .unwrap_or_else(|| channel_draft_message_has_explicit_workflow_intent(&text));
211    let mut draft = ChannelAutomationDraftRecord {
212        draft_id: format!("channel-draft-{}", Uuid::new_v4()),
213        status: ChannelAutomationDraftStatus::Collecting,
214        original_text: text.clone(),
215        goal: infer_goal(&text),
216        schedule_hint: infer_schedule_hint(&text),
217        delivery_target: Some(infer_delivery_target(&text)),
218        missing_fields: Vec::new(),
219        question: None,
220        preview: None,
221        automation_id: None,
222        allowed_tools: normalize_list(input.allowed_tools),
223        allowed_mcp_servers: normalize_list(input.allowed_mcp_servers),
224        allowed_mcp_tools: normalize_list(input.allowed_mcp_tools),
225        security_profile: input.security_profile,
226        workflow_planner_enabled: input.workflow_planner_enabled,
227        strict_kb_grounding: Some(strict_kb_grounding),
228        factual_question: Some(factual_question),
229        explicit_workflow_intent: Some(explicit_workflow_intent),
230        channel_context: context,
231        created_at_ms: now,
232        updated_at_ms: now,
233        expires_at_ms: now.saturating_add(CHANNEL_DRAFT_TTL_MS),
234    };
235    if let Some(payload) = producer_guarded_response(
236        &state,
237        &mut draft,
238        None,
239        ProducerCaller::Start,
240        strict_kb_grounding,
241        factual_question,
242        explicit_workflow_intent,
243    )
244    .await
245    {
246        return Ok(Json(payload));
247    }
248    advance_draft(&mut draft, now);
249    state
250        .channel_automation_drafts
251        .write()
252        .await
253        .insert(draft.draft_id.clone(), draft.clone());
254    persist_channel_drafts_or_log(&state).await;
255    Ok(Json(draft_response(&draft)))
256}
257
258pub(super) async fn channel_automation_drafts_answer(
259    State(state): State<AppState>,
260    Path(draft_id): Path<String>,
261    Json(input): Json<ChannelAutomationDraftAnswerRequest>,
262) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
263    let now = crate::now_ms();
264    let mut draft = get_mutable_draft(&state, &draft_id).await?;
265    ensure_open_draft(&draft, now)?;
266    let answer = input.answer.trim().to_string();
267    if answer.is_empty() {
268        return Err(bad_request("answer is required"));
269    }
270    let strict_kb_grounding = input
271        .strict_kb_grounding
272        .or(draft.strict_kb_grounding)
273        .unwrap_or(
274            channel_strict_kb_grounding(&state, &draft.channel_context.source_platform).await,
275        );
276    let factual_question = input
277        .factual_question
278        .unwrap_or_else(|| channel_draft_message_is_factual_question(&answer));
279    let explicit_workflow_intent = input
280        .explicit_workflow_intent
281        .unwrap_or_else(|| channel_draft_message_has_explicit_workflow_intent(&answer));
282    draft.workflow_planner_enabled = input
283        .workflow_planner_enabled
284        .or(draft.workflow_planner_enabled);
285    draft.strict_kb_grounding = Some(strict_kb_grounding);
286    draft.factual_question = Some(factual_question);
287    draft.explicit_workflow_intent = Some(explicit_workflow_intent);
288    if let Some(payload) = producer_guarded_response(
289        &state,
290        &mut draft,
291        Some(draft_id.as_str()),
292        ProducerCaller::Answer,
293        strict_kb_grounding,
294        factual_question,
295        explicit_workflow_intent,
296    )
297    .await
298    {
299        return Ok(Json(payload));
300    }
301    if is_cancel_text(&answer) {
302        draft.status = ChannelAutomationDraftStatus::Cancelled;
303        draft.question = None;
304        draft.updated_at_ms = now;
305        store_draft(&state, draft.clone()).await;
306        return Ok(Json(draft_response(&draft)));
307    }
308    match draft
309        .question
310        .as_ref()
311        .map(|question| question.field.as_str())
312    {
313        Some("goal") => {
314            if draft.schedule_hint.is_none() {
315                draft.schedule_hint = infer_schedule_hint(&answer);
316            }
317            draft.goal = Some(answer);
318        }
319        Some("schedule_hint") => draft.schedule_hint = Some(answer),
320        Some("delivery_target") => draft.delivery_target = Some(answer),
321        _ if draft.status == ChannelAutomationDraftStatus::PreviewReady => {}
322        _ => draft.goal = Some(answer),
323    }
324    advance_draft(&mut draft, now);
325    store_draft(&state, draft.clone()).await;
326    Ok(Json(draft_response(&draft)))
327}
328
329pub(super) async fn channel_automation_drafts_confirm(
330    State(state): State<AppState>,
331    Path(draft_id): Path<String>,
332) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
333    let now = crate::now_ms();
334    let mut draft = get_mutable_draft(&state, &draft_id).await?;
335    ensure_open_draft(&draft, now)?;
336    if draft.status != ChannelAutomationDraftStatus::PreviewReady {
337        return Err((
338            StatusCode::CONFLICT,
339            Json(json!({
340                "error": "Draft is not ready for confirmation",
341                "code": "CHANNEL_AUTOMATION_DRAFT_NOT_READY",
342                "draft": draft,
343            })),
344        ));
345    }
346    let automation = build_channel_automation(&draft, now);
347    let stored = state.put_automation_v2(automation).await.map_err(|error| {
348        (
349            StatusCode::BAD_REQUEST,
350            Json(json!({
351                "error": error.to_string(),
352                "code": "CHANNEL_AUTOMATION_CREATE_FAILED",
353            })),
354        )
355    })?;
356    draft.status = ChannelAutomationDraftStatus::Applied;
357    draft.automation_id = Some(stored.automation_id.clone());
358    draft.question = None;
359    draft.updated_at_ms = now;
360    store_draft(&state, draft.clone()).await;
361    let mut payload = draft_response(&draft);
362    if let Some(obj) = payload.as_object_mut() {
363        obj.insert("automation".to_string(), json!(stored));
364    }
365    Ok(Json(payload))
366}
367
368pub(super) async fn channel_automation_drafts_cancel(
369    State(state): State<AppState>,
370    Path(draft_id): Path<String>,
371) -> Result<Json<Value>, (StatusCode, Json<Value>)> {
372    let now = crate::now_ms();
373    let mut draft = get_mutable_draft(&state, &draft_id).await?;
374    draft.status = ChannelAutomationDraftStatus::Cancelled;
375    draft.question = None;
376    draft.updated_at_ms = now;
377    store_draft(&state, draft.clone()).await;
378    Ok(Json(draft_response(&draft)))
379}
380
381pub(super) async fn channel_automation_drafts_pending(
382    State(state): State<AppState>,
383    Query(query): Query<ChannelAutomationDraftPendingQuery>,
384) -> Json<Value> {
385    let now = crate::now_ms();
386    let drafts = state
387        .channel_automation_drafts
388        .read()
389        .await
390        .values()
391        .filter(|draft| draft.expires_at_ms > now)
392        .filter(|draft| {
393            query
394                .channel
395                .as_deref()
396                .map(|value| draft.channel_context.source_platform == value)
397                .unwrap_or(true)
398        })
399        .filter(|draft| {
400            query
401                .scope_id
402                .as_deref()
403                .map(|value| draft.channel_context.scope_id == value)
404                .unwrap_or(true)
405        })
406        .filter(|draft| {
407            query
408                .sender
409                .as_deref()
410                .map(|value| draft.channel_context.sender == value)
411                .unwrap_or(true)
412        })
413        .filter(|draft| {
414            matches!(
415                draft.status,
416                ChannelAutomationDraftStatus::Collecting
417                    | ChannelAutomationDraftStatus::PreviewReady
418                    | ChannelAutomationDraftStatus::Blocked
419            )
420        })
421        .cloned()
422        .collect::<Vec<_>>();
423    let count = drafts.len();
424    Json(json!({
425        "drafts": drafts,
426        "count": count,
427    }))
428}
429
430async fn get_mutable_draft(
431    state: &AppState,
432    draft_id: &str,
433) -> Result<ChannelAutomationDraftRecord, (StatusCode, Json<Value>)> {
434    state
435        .channel_automation_drafts
436        .read()
437        .await
438        .get(draft_id)
439        .cloned()
440        .ok_or_else(|| {
441            (
442                StatusCode::NOT_FOUND,
443                Json(json!({
444                    "error": "Channel automation draft not found",
445                    "code": "CHANNEL_AUTOMATION_DRAFT_NOT_FOUND",
446                    "draft_id": draft_id,
447                })),
448            )
449        })
450}
451
452async fn store_draft(state: &AppState, draft: ChannelAutomationDraftRecord) {
453    state
454        .channel_automation_drafts
455        .write()
456        .await
457        .insert(draft.draft_id.clone(), draft);
458    persist_channel_drafts_or_log(state).await;
459}
460
461async fn persist_channel_drafts_or_log(state: &AppState) {
462    if let Err(error) = state.persist_channel_automation_drafts().await {
463        tracing::warn!("failed to persist channel automation drafts: {error}");
464    }
465}
466
467fn ensure_open_draft(
468    draft: &ChannelAutomationDraftRecord,
469    now: u64,
470) -> Result<(), (StatusCode, Json<Value>)> {
471    if draft.expires_at_ms <= now {
472        return Err((
473            StatusCode::GONE,
474            Json(json!({
475                "error": "Channel automation draft expired",
476                "code": "CHANNEL_AUTOMATION_DRAFT_EXPIRED",
477                "draft_id": draft.draft_id,
478            })),
479        ));
480    }
481    match draft.status {
482        ChannelAutomationDraftStatus::Cancelled
483        | ChannelAutomationDraftStatus::Expired
484        | ChannelAutomationDraftStatus::Applied => Err((
485            StatusCode::CONFLICT,
486            Json(json!({
487                "error": "Channel automation draft is closed",
488                "code": "CHANNEL_AUTOMATION_DRAFT_CLOSED",
489                "draft_id": draft.draft_id,
490                "status": draft.status.clone(),
491            })),
492        )),
493        ChannelAutomationDraftStatus::Collecting
494        | ChannelAutomationDraftStatus::PreviewReady
495        | ChannelAutomationDraftStatus::Blocked => Ok(()),
496    }
497}
498
499fn advance_draft(draft: &mut ChannelAutomationDraftRecord, now: u64) {
500    draft.updated_at_ms = now;
501    draft.expires_at_ms = now.saturating_add(CHANNEL_DRAFT_TTL_MS);
502    draft.missing_fields.clear();
503    draft.question = None;
504    draft.preview = None;
505
506    if draft
507        .goal
508        .as_deref()
509        .map(str::trim)
510        .unwrap_or("")
511        .is_empty()
512    {
513        draft.status = ChannelAutomationDraftStatus::Collecting;
514        draft.missing_fields.push("goal".to_string());
515        draft.question = Some(ChannelAutomationDraftQuestion {
516            field: "goal".to_string(),
517            text: "What should this automation do?".to_string(),
518            options: Vec::new(),
519        });
520        return;
521    }
522    if draft
523        .schedule_hint
524        .as_deref()
525        .map(str::trim)
526        .unwrap_or("")
527        .is_empty()
528    {
529        draft.status = ChannelAutomationDraftStatus::Collecting;
530        draft.missing_fields.push("schedule_hint".to_string());
531        draft.question = Some(ChannelAutomationDraftQuestion {
532            field: "schedule_hint".to_string(),
533            text: "When should it run, or what event should trigger it?".to_string(),
534            options: vec![
535                "daily".to_string(),
536                "weekly".to_string(),
537                "when something changes".to_string(),
538            ],
539        });
540        return;
541    }
542
543    let goal = draft.goal.clone().unwrap_or_default();
544    let schedule_hint = draft.schedule_hint.clone().unwrap_or_default();
545    let delivery_target = draft
546        .delivery_target
547        .clone()
548        .unwrap_or_else(|| "same_chat".to_string());
549    draft.status = ChannelAutomationDraftStatus::PreviewReady;
550    draft.preview = Some(ChannelAutomationDraftPreview {
551        summary: format!(
552            "Run `{}` on `{}` and report to `{}`.",
553            truncate_for_label(&goal, 80),
554            schedule_hint,
555            delivery_target
556        ),
557        goal,
558        schedule_hint,
559        delivery_target,
560    });
561}
562
563fn draft_response(draft: &ChannelAutomationDraftRecord) -> Value {
564    let message = format_channel_draft_message(draft);
565    log_channel_automation_draft_response_producer(
566        draft,
567        None,
568        draft.workflow_planner_enabled,
569        draft.strict_kb_grounding.unwrap_or(false),
570        draft.factual_question.unwrap_or(false),
571        draft.explicit_workflow_intent.unwrap_or(false),
572        false,
573        true,
574        "draft_response",
575    );
576    json!({
577        "draft": draft,
578        "message": message,
579    })
580}
581
582#[derive(Debug, Clone, Copy)]
583enum ProducerCaller {
584    Start,
585    Answer,
586}
587
588impl ProducerCaller {
589    fn label(self) -> &'static str {
590        match self {
591            Self::Start => "channel_automation_drafts_start",
592            Self::Answer => "channel_automation_drafts_answer",
593        }
594    }
595}
596
597async fn producer_guarded_response(
598    state: &AppState,
599    draft: &mut ChannelAutomationDraftRecord,
600    pending_draft_id: Option<&str>,
601    caller: ProducerCaller,
602    strict_kb_grounding: bool,
603    factual_question: bool,
604    explicit_workflow_intent: bool,
605) -> Option<Value> {
606    let workflow_planner_enabled = draft.workflow_planner_enabled.unwrap_or(true);
607    let block_reason = if !workflow_planner_enabled {
608        Some((
609            "workflow_drafting_disabled",
610            CHANNEL_WORKFLOW_DRAFTING_DISABLED_MESSAGE,
611        ))
612    } else if strict_kb_grounding && factual_question && !explicit_workflow_intent {
613        Some((
614            "strict_kb_factual_question",
615            STRICT_KB_FACTUAL_DRAFT_BLOCKED_MESSAGE,
616        ))
617    } else {
618        None
619    };
620
621    let Some((reason, message)) = block_reason else {
622        log_channel_automation_draft_response_producer(
623            draft,
624            pending_draft_id,
625            Some(workflow_planner_enabled),
626            strict_kb_grounding,
627            factual_question,
628            explicit_workflow_intent,
629            false,
630            false,
631            caller.label(),
632        );
633        return None;
634    };
635
636    draft.status = ChannelAutomationDraftStatus::Cancelled;
637    draft.goal = None;
638    draft.schedule_hint = None;
639    draft.delivery_target = None;
640    draft.missing_fields.clear();
641    draft.question = None;
642    draft.preview = None;
643    draft.updated_at_ms = crate::now_ms();
644    draft.workflow_planner_enabled = Some(workflow_planner_enabled);
645    draft.strict_kb_grounding = Some(strict_kb_grounding);
646    draft.factual_question = Some(factual_question);
647    draft.explicit_workflow_intent = Some(explicit_workflow_intent);
648    store_draft(state, draft.clone()).await;
649    log_channel_automation_draft_response_producer(
650        draft,
651        pending_draft_id,
652        Some(workflow_planner_enabled),
653        strict_kb_grounding,
654        factual_question,
655        explicit_workflow_intent,
656        true,
657        false,
658        reason,
659    );
660    Some(json!({
661        "draft": draft,
662        "message": message,
663        "blocked": true,
664        "block_reason": reason,
665    }))
666}
667
668async fn channel_strict_kb_grounding(state: &AppState, channel: &str) -> bool {
669    let channel = channel.trim().to_ascii_lowercase();
670    if channel.is_empty() {
671        return false;
672    }
673    state
674        .config
675        .get_effective_value()
676        .await
677        .get("channels")
678        .and_then(Value::as_object)
679        .and_then(|channels| channels.get(&channel))
680        .and_then(Value::as_object)
681        .and_then(|cfg| cfg.get("strict_kb_grounding"))
682        .and_then(Value::as_bool)
683        .unwrap_or(false)
684}
685
686fn channel_draft_message_is_factual_question(message: &str) -> bool {
687    let text = message.trim().to_ascii_lowercase();
688    if text.is_empty() || text.starts_with('/') || !text.contains('?') {
689        return false;
690    }
691    [
692        "what ", "who ", "where ", "when ", "which ", "can ", "could ", "does ", "do ", "is ",
693        "are ", "how ", "tell me ", "explain ",
694    ]
695    .iter()
696    .any(|starter| text.starts_with(starter))
697}
698
699fn channel_draft_message_has_explicit_workflow_intent(message: &str) -> bool {
700    let text = message.trim().to_ascii_lowercase();
701    if text.is_empty() || text.starts_with('/') {
702        return false;
703    }
704    let contains_explicit_phrase = [
705        "create a workflow",
706        "create workflow",
707        "build a workflow",
708        "build workflow",
709        "set up a workflow",
710        "setup a workflow",
711        "make a workflow",
712        "create an automation",
713        "create automation",
714        "build an automation",
715        "build automation",
716        "schedule a workflow",
717        "schedule a daily report",
718        "schedule a report",
719        "schedule a reminder",
720        "set up a bot",
721        "setup a bot",
722        "make a bot that runs",
723        "create a bot that runs",
724    ]
725    .iter()
726    .any(|phrase| text.contains(phrase));
727    let contains_workflow_target = ["workflow", "automation", "automations", "bot", "reminder"]
728        .iter()
729        .any(|word| text.contains(word));
730    let contains_authoring_verb = [
731        "create", "build", "make", "draft", "schedule", "automate", "set up", "setup",
732    ]
733    .iter()
734    .any(|word| text.contains(word));
735    let monitoring_request = text.starts_with("monitor ")
736        && [
737            " every ",
738            " each ",
739            "daily",
740            "weekly",
741            "hourly",
742            "every morning",
743        ]
744        .iter()
745        .any(|word| text.contains(word));
746    contains_explicit_phrase
747        || (contains_workflow_target && contains_authoring_verb)
748        || monitoring_request
749}
750
751#[allow(clippy::too_many_arguments)]
752fn log_channel_automation_draft_response_producer(
753    draft: &ChannelAutomationDraftRecord,
754    pending_draft_id: Option<&str>,
755    workflow_planner_enabled: Option<bool>,
756    strict_kb_grounding: bool,
757    factual_question: bool,
758    explicit_workflow_intent: bool,
759    blocked: bool,
760    emitted: bool,
761    reason: &str,
762) {
763    tracing::warn!(
764        prefix = "CHANNEL_AUTOMATION_DRAFT_RESPONSE_PRODUCER",
765        channel = %draft.channel_context.source_platform,
766        platform = %draft.channel_context.source_platform,
767        session_id = ?draft.channel_context.session_id,
768        scope_id = %draft.channel_context.scope_id,
769        draft_id = %draft.draft_id,
770        pending_draft_id = ?pending_draft_id,
771        workflow_planner_enabled = ?workflow_planner_enabled,
772        strict_kb_grounding,
773        factual_question,
774        explicit_workflow_intent,
775        blocked,
776        emitted,
777        reason,
778        "CHANNEL_AUTOMATION_DRAFT_RESPONSE_PRODUCER"
779    );
780}
781
782fn format_channel_draft_message(draft: &ChannelAutomationDraftRecord) -> String {
783    match draft.status {
784        ChannelAutomationDraftStatus::Collecting => {
785            let question = draft
786                .question
787                .as_ref()
788                .map(|question| {
789                    let mut lines = vec![question.text.clone()];
790                    for (index, option) in question.options.iter().enumerate() {
791                        lines.push(format!("{}. {}", index + 1, option));
792                    }
793                    lines.join("\n")
794                })
795                .unwrap_or_else(|| {
796                    "I need one more detail before I can draft this automation.".to_string()
797                });
798            format!("{question}\nReply here with the answer, or reply `cancel` to stop.")
799        }
800        ChannelAutomationDraftStatus::PreviewReady => {
801            let preview = draft.preview.as_ref();
802            let summary = preview
803                .map(|value| value.summary.clone())
804                .unwrap_or_else(|| "Automation draft is ready.".to_string());
805            format!("{summary}\nReply `confirm` to create it, or `cancel` to stop.")
806        }
807        ChannelAutomationDraftStatus::Applied => {
808            let id = draft.automation_id.as_deref().unwrap_or("unknown");
809            format!("Automation created: `{id}`")
810        }
811        ChannelAutomationDraftStatus::Cancelled => "Automation draft cancelled.".to_string(),
812        ChannelAutomationDraftStatus::Expired => "Automation draft expired.".to_string(),
813        ChannelAutomationDraftStatus::Blocked => {
814            "Automation draft is blocked until required channel setup is completed.".to_string()
815        }
816    }
817}
818
819fn build_channel_automation(draft: &ChannelAutomationDraftRecord, now: u64) -> AutomationV2Spec {
820    let goal = draft
821        .goal
822        .clone()
823        .unwrap_or_else(|| draft.original_text.clone());
824    let schedule_hint = draft.schedule_hint.clone().unwrap_or_default();
825    let automation_id = format!("automation-v2-{}", Uuid::new_v4());
826    let agent_id = format!("agent-{}", Uuid::new_v4());
827    let output_target = format!(
828        "channel:{}:{}",
829        empty_as_unknown(&draft.channel_context.source_platform),
830        empty_as_unknown(&draft.channel_context.scope_id)
831    );
832    let metadata = json!({
833        "created_from": "channel_automation_draft",
834        "draft_id": draft.draft_id,
835        "channel_context": draft.channel_context,
836        "schedule_hint": schedule_hint,
837        "delivery_target": draft.delivery_target,
838        "security_profile": draft.security_profile,
839        "allowed_tools": draft.allowed_tools,
840        "allowed_mcp_servers": draft.allowed_mcp_servers,
841        "allowed_mcp_tools": draft.allowed_mcp_tools,
842        "confirmation_required": true,
843        "confirmed_at_ms": now,
844    });
845    AutomationV2Spec {
846        automation_id,
847        name: automation_name_from_goal(&goal),
848        description: Some(format!("Created from channel chat: {goal}")),
849        status: AutomationV2Status::Active,
850        schedule: schedule_from_hint(&schedule_hint),
851        knowledge: tandem_orchestrator::KnowledgeBinding::default(),
852        agents: vec![AutomationAgentProfile {
853            agent_id: agent_id.clone(),
854            template_id: None,
855            display_name: "Channel Automation Agent".to_string(),
856            avatar_url: None,
857            model_policy: None,
858            skills: Vec::new(),
859            tool_policy: AutomationAgentToolPolicy {
860                allowlist: draft.allowed_tools.clone(),
861                denylist: Vec::new(),
862            },
863            mcp_policy: AutomationAgentMcpPolicy {
864                allowed_servers: draft.allowed_mcp_servers.clone(),
865                allowed_tools: if draft.allowed_mcp_tools.is_empty() {
866                    None
867                } else {
868                    Some(draft.allowed_mcp_tools.clone())
869                },
870            },
871            approval_policy: None,
872        }],
873        flow: AutomationFlowSpec {
874            nodes: vec![AutomationFlowNode {
875                node_id: "channel_automation_task".to_string(),
876                agent_id,
877                objective: format!(
878                    "{goal}\n\nReport results back to the originating chat context."
879                ),
880                knowledge: tandem_orchestrator::KnowledgeBinding::default(),
881                depends_on: Vec::new(),
882                input_refs: Vec::new(),
883                output_contract: Some(AutomationFlowOutputContract {
884                    kind: "channel_response".to_string(),
885                    validator: Some(AutomationOutputValidatorKind::GenericArtifact),
886                    enforcement: None,
887                    schema: None,
888                    summary_guidance: Some(
889                        "Summarize the outcome in channel-safe language.".to_string(),
890                    ),
891                }),
892                retry_policy: None,
893                timeout_ms: None,
894                max_tool_calls: Some(16),
895                stage_kind: None,
896                gate: None,
897                metadata: Some(json!({
898                    "created_from": "channel_automation_draft",
899                    "source_platform": draft.channel_context.source_platform,
900                    "source_scope_id": draft.channel_context.scope_id,
901                })),
902            }],
903        },
904        execution: AutomationExecutionPolicy {
905            max_parallel_agents: Some(1),
906            max_total_runtime_ms: Some(15 * 60 * 1000),
907            max_total_tool_calls: Some(24),
908            max_total_tokens: None,
909            max_total_cost_usd: None,
910        },
911        output_targets: vec![output_target],
912        created_at_ms: now,
913        updated_at_ms: now,
914        creator_id: if draft.channel_context.sender.trim().is_empty() {
915            "channel".to_string()
916        } else {
917            draft.channel_context.sender.clone()
918        },
919        workspace_root: None,
920        metadata: Some(metadata),
921        next_fire_at_ms: None,
922        last_fired_at_ms: None,
923        scope_policy: None,
924        watch_conditions: Vec::new(),
925        handoff_config: None,
926    }
927}
928
929fn schedule_from_hint(hint: &str) -> AutomationV2Schedule {
930    let lower = hint.to_ascii_lowercase();
931    if lower.contains("daily") || lower.contains("every day") {
932        if let Some((hour, minute)) = parse_daily_time_hint(&lower) {
933            return AutomationV2Schedule {
934                schedule_type: AutomationV2ScheduleType::Cron,
935                cron_expression: Some(format!("0 {minute} {hour} * * * *")),
936                interval_seconds: None,
937                timezone: "UTC".to_string(),
938                misfire_policy: RoutineMisfirePolicy::RunOnce,
939            };
940        }
941    }
942    let interval_seconds = if lower.contains("hourly") || lower.contains("every hour") {
943        Some(60 * 60)
944    } else if lower.contains("weekly") || lower.contains("every week") {
945        Some(7 * 24 * 60 * 60)
946    } else if lower.contains("daily") || lower.contains("every day") {
947        Some(24 * 60 * 60)
948    } else {
949        None
950    };
951    AutomationV2Schedule {
952        schedule_type: interval_seconds
953            .map(|_| AutomationV2ScheduleType::Interval)
954            .unwrap_or(AutomationV2ScheduleType::Manual),
955        cron_expression: None,
956        interval_seconds,
957        timezone: "UTC".to_string(),
958        misfire_policy: RoutineMisfirePolicy::RunOnce,
959    }
960}
961
962fn parse_daily_time_hint(lower: &str) -> Option<(u32, u32)> {
963    let marker = lower
964        .find(" at ")
965        .map(|index| index + 4)
966        .or_else(|| lower.find(" around ").map(|index| index + 8))?;
967    let tail = lower.get(marker..)?.trim();
968    let token = tail
969        .split_whitespace()
970        .next()
971        .unwrap_or("")
972        .trim_matches(|ch: char| ch == '.' || ch == ',');
973    if token.is_empty() {
974        return None;
975    }
976    let is_pm = token.ends_with("pm");
977    let is_am = token.ends_with("am");
978    let numeric = token.trim_end_matches("am").trim_end_matches("pm");
979    let mut pieces = numeric.split(':');
980    let mut hour = pieces.next()?.parse::<u32>().ok()?;
981    let minute = pieces
982        .next()
983        .map(|value| value.parse::<u32>().ok())
984        .unwrap_or(Some(0))?;
985    if hour > 23 || minute > 59 {
986        return None;
987    }
988    if is_pm && hour < 12 {
989        hour += 12;
990    } else if is_am && hour == 12 {
991        hour = 0;
992    }
993    Some((hour, minute))
994}
995
996fn infer_goal(text: &str) -> Option<String> {
997    let mut candidate = text.trim();
998    let lower = candidate.to_ascii_lowercase();
999    let generic = [
1000        "create an automation",
1001        "create automation",
1002        "make an automation",
1003        "set up an automation",
1004        "setup an automation",
1005        "automate this",
1006    ];
1007    if generic.iter().any(|value| lower == *value) {
1008        return None;
1009    }
1010    let prefixes = [
1011        "create an automation that",
1012        "create automation that",
1013        "make an automation that",
1014        "set up an automation that",
1015        "setup an automation that",
1016        "automate",
1017        "please automate",
1018    ];
1019    for prefix in prefixes {
1020        if lower.starts_with(prefix) {
1021            candidate = candidate.get(prefix.len()..).unwrap_or(candidate).trim();
1022            break;
1023        }
1024    }
1025    if candidate.chars().count() < 8 {
1026        None
1027    } else {
1028        Some(candidate.to_string())
1029    }
1030}
1031
1032fn infer_schedule_hint(text: &str) -> Option<String> {
1033    let lower = text.to_ascii_lowercase();
1034    if lower.contains("hourly") || lower.contains("every hour") {
1035        return Some("hourly".to_string());
1036    }
1037    if lower.contains("weekly") || lower.contains("every week") {
1038        return Some("weekly".to_string());
1039    }
1040    if lower.contains("daily") || lower.contains("every day") || lower.contains("each day") {
1041        return Some(extract_daily_hint(text));
1042    }
1043    if lower.starts_with("when ")
1044        || lower.contains(" whenever ")
1045        || lower.contains(" when ")
1046        || lower.contains(" on new ")
1047        || lower.contains(" new issue")
1048    {
1049        return Some("event-driven".to_string());
1050    }
1051    None
1052}
1053
1054fn extract_daily_hint(text: &str) -> String {
1055    let lower = text.to_ascii_lowercase();
1056    for marker in [" at ", " around "] {
1057        if let Some(index) = lower.find(marker) {
1058            let time = text[index + marker.len()..]
1059                .split_whitespace()
1060                .take(2)
1061                .collect::<Vec<_>>()
1062                .join(" ");
1063            if !time.trim().is_empty() {
1064                return format!(
1065                    "daily at {}",
1066                    time.trim_matches(|ch: char| ch == '.' || ch == ',')
1067                );
1068            }
1069        }
1070    }
1071    "daily".to_string()
1072}
1073
1074fn infer_delivery_target(text: &str) -> String {
1075    let lower = text.to_ascii_lowercase();
1076    if lower.contains(" here") || lower.contains("this channel") || lower.contains("this chat") {
1077        "same_chat".to_string()
1078    } else {
1079        "same_chat".to_string()
1080    }
1081}
1082
1083fn automation_name_from_goal(goal: &str) -> String {
1084    let label = truncate_for_label(goal.trim(), 56);
1085    if label.is_empty() {
1086        "Channel automation".to_string()
1087    } else {
1088        label
1089    }
1090}
1091
1092fn truncate_for_label(value: &str, max_chars: usize) -> String {
1093    if value.chars().count() <= max_chars {
1094        return value.to_string();
1095    }
1096    let mut clipped = value
1097        .chars()
1098        .take(max_chars.saturating_sub(3))
1099        .collect::<String>();
1100    clipped.push_str("...");
1101    clipped
1102}
1103
1104fn normalize_list(values: Vec<String>) -> Vec<String> {
1105    let mut normalized = values
1106        .into_iter()
1107        .map(|value| value.trim().to_string())
1108        .filter(|value| !value.is_empty())
1109        .collect::<Vec<_>>();
1110    normalized.sort();
1111    normalized.dedup();
1112    normalized
1113}
1114
1115fn empty_as_unknown(value: &str) -> &str {
1116    if value.trim().is_empty() {
1117        "unknown"
1118    } else {
1119        value
1120    }
1121}
1122
1123fn is_cancel_text(value: &str) -> bool {
1124    matches!(
1125        value.trim().to_ascii_lowercase().as_str(),
1126        "cancel" | "stop" | "abort" | "never mind" | "nevermind"
1127    )
1128}
1129
1130fn bad_request(detail: &str) -> (StatusCode, Json<Value>) {
1131    (
1132        StatusCode::BAD_REQUEST,
1133        Json(json!({
1134            "error": "Invalid channel automation draft request",
1135            "code": "CHANNEL_AUTOMATION_DRAFT_INVALID",
1136            "detail": detail,
1137        })),
1138    )
1139}