Skip to main content

tuitbot_core/workflow/
orchestrate.rs

1//! Deterministic orchestrator for autopilot discovery cycles.
2//!
3//! Composes the atomic workflow steps (discover → draft → queue) into a single
4//! entrypoint that autopilot loops and batch operations can call.
5//!
6//! This replaces the pattern of each consumer reimplementing the composition.
7
8use std::sync::Arc;
9
10use crate::config::Config;
11use crate::llm::LlmProvider;
12use crate::storage::DbPool;
13use crate::x_api::XApiClient;
14
15use super::discover::{self, DiscoverInput};
16use super::draft::{self, DraftInput};
17use super::queue::{self, QueueInput};
18use super::{DraftResult, ProposeResult, QueueItem, ScoredCandidate, WorkflowError};
19
20/// Input for a full discovery cycle.
21#[derive(Debug, Clone)]
22pub struct CycleInput {
23    /// Search query (optional — falls back to product keywords).
24    pub query: Option<String>,
25    /// Minimum score threshold.
26    pub min_score: Option<f64>,
27    /// Maximum candidates to discover.
28    pub limit: Option<u32>,
29    /// Only discover tweets newer than this ID.
30    pub since_id: Option<String>,
31    /// Whether to mention the product in generated replies.
32    pub mention_product: bool,
33}
34
35/// Report from a completed discovery cycle.
36#[derive(Debug, Clone)]
37pub struct CycleReport {
38    /// Candidates found during discovery.
39    pub discovered: Vec<ScoredCandidate>,
40    /// The search query that was used.
41    pub query_used: String,
42    /// Draft results (one per candidate that was drafted).
43    pub drafts: Vec<DraftResult>,
44    /// Queue/execution results.
45    pub queued: Vec<ProposeResult>,
46    /// Summary counts.
47    pub summary: CycleSummary,
48}
49
50/// Summary statistics for the cycle.
51#[derive(Debug, Clone)]
52pub struct CycleSummary {
53    pub candidates_found: usize,
54    pub drafts_generated: usize,
55    pub drafts_failed: usize,
56    pub replies_queued: usize,
57    pub replies_executed: usize,
58    pub replies_blocked: usize,
59}
60
61/// Run a complete discovery cycle: discover → draft → queue.
62///
63/// This is the canonical entrypoint for autopilot loops. It composes the
64/// three atomic steps in sequence, passing outputs forward.
65///
66/// The cycle is deterministic: given the same inputs and external state,
67/// it produces the same outputs. Side effects (DB writes, X API calls)
68/// are explicit and auditable through the step functions.
69pub async fn run_discovery_cycle(
70    db: &DbPool,
71    x_client: &dyn XApiClient,
72    llm: &Arc<dyn LlmProvider>,
73    config: &Config,
74    input: CycleInput,
75) -> Result<CycleReport, WorkflowError> {
76    // Step 1: Discover — search, score, rank
77    let discover_output = discover::execute(
78        db,
79        x_client,
80        config,
81        DiscoverInput {
82            query: input.query,
83            min_score: input.min_score,
84            limit: input.limit,
85            since_id: input.since_id,
86        },
87    )
88    .await?;
89
90    if discover_output.candidates.is_empty() {
91        return Ok(CycleReport {
92            query_used: discover_output.query_used,
93            discovered: vec![],
94            drafts: vec![],
95            queued: vec![],
96            summary: CycleSummary {
97                candidates_found: 0,
98                drafts_generated: 0,
99                drafts_failed: 0,
100                replies_queued: 0,
101                replies_executed: 0,
102                replies_blocked: 0,
103            },
104        });
105    }
106
107    // Filter to actionable candidates (skip already-replied and low-action)
108    let actionable_ids: Vec<String> = discover_output
109        .candidates
110        .iter()
111        .filter(|c| !c.already_replied && c.recommended_action != "skip")
112        .map(|c| c.tweet_id.clone())
113        .collect();
114
115    let candidates_found = discover_output.candidates.len();
116
117    if actionable_ids.is_empty() {
118        return Ok(CycleReport {
119            query_used: discover_output.query_used,
120            discovered: discover_output.candidates,
121            drafts: vec![],
122            queued: vec![],
123            summary: CycleSummary {
124                candidates_found,
125                drafts_generated: 0,
126                drafts_failed: 0,
127                replies_queued: 0,
128                replies_executed: 0,
129                replies_blocked: 0,
130            },
131        });
132    }
133
134    // Step 2: Draft — generate LLM replies for actionable candidates
135    let drafts = draft::execute(
136        db,
137        llm,
138        config,
139        DraftInput {
140            candidate_ids: actionable_ids,
141            archetype: None,
142            mention_product: input.mention_product,
143        },
144    )
145    .await?;
146
147    // Collect successful drafts as queue items
148    let queue_items: Vec<QueueItem> = drafts
149        .iter()
150        .filter_map(|d| match d {
151            DraftResult::Success {
152                candidate_id,
153                draft_text,
154                ..
155            } => Some(QueueItem {
156                candidate_id: candidate_id.clone(),
157                pre_drafted_text: Some(draft_text.clone()),
158            }),
159            DraftResult::Error { .. } => None,
160        })
161        .collect();
162
163    let drafts_generated = queue_items.len();
164    let drafts_failed = drafts.len() - drafts_generated;
165
166    // Step 3: Queue — safety check + route to approval or execute
167    let queued = if queue_items.is_empty() {
168        vec![]
169    } else {
170        queue::execute(
171            db,
172            Some(x_client),
173            Some(llm),
174            config,
175            QueueInput {
176                items: queue_items,
177                mention_product: input.mention_product,
178            },
179        )
180        .await?
181    };
182
183    // Count results
184    let replies_queued = queued
185        .iter()
186        .filter(|r| matches!(r, ProposeResult::Queued { .. }))
187        .count();
188    let replies_executed = queued
189        .iter()
190        .filter(|r| matches!(r, ProposeResult::Executed { .. }))
191        .count();
192    let replies_blocked = queued
193        .iter()
194        .filter(|r| matches!(r, ProposeResult::Blocked { .. }))
195        .count();
196
197    Ok(CycleReport {
198        query_used: discover_output.query_used,
199        discovered: discover_output.candidates,
200        drafts,
201        queued,
202        summary: CycleSummary {
203            candidates_found,
204            drafts_generated,
205            drafts_failed,
206            replies_queued,
207            replies_executed,
208            replies_blocked,
209        },
210    })
211}