Skip to main content

tuitbot_core/workflow/
orchestrate.rs

1//! Deterministic orchestrator for autopilot discovery cycles.
2//!
3//! Composes the atomic workflow steps (discover → draft → queue) into a single
4//! entrypoint that autopilot loops and batch operations can call.
5//!
6//! This replaces the pattern of each consumer reimplementing the composition.
7
8use std::sync::Arc;
9
10use crate::config::Config;
11use crate::llm::LlmProvider;
12use crate::storage::DbPool;
13use crate::x_api::XApiClient;
14
15use super::discover::{self, DiscoverInput};
16use super::draft::{self, DraftInput};
17use super::queue::{self, QueueInput};
18use super::{DraftResult, ProposeResult, QueueItem, ScoredCandidate, WorkflowError};
19
20/// Input for a full discovery cycle.
21#[derive(Debug, Clone)]
22pub struct CycleInput {
23    /// Search query (optional — falls back to product keywords).
24    pub query: Option<String>,
25    /// Minimum score threshold.
26    pub min_score: Option<f64>,
27    /// Maximum candidates to discover.
28    pub limit: Option<u32>,
29    /// Only discover tweets newer than this ID.
30    pub since_id: Option<String>,
31    /// Whether to mention the product in generated replies.
32    pub mention_product: bool,
33}
34
35/// Report from a completed discovery cycle.
36#[derive(Debug, Clone)]
37pub struct CycleReport {
38    /// Candidates found during discovery.
39    pub discovered: Vec<ScoredCandidate>,
40    /// The search query that was used.
41    pub query_used: String,
42    /// Draft results (one per candidate that was drafted).
43    pub drafts: Vec<DraftResult>,
44    /// Queue/execution results.
45    pub queued: Vec<ProposeResult>,
46    /// Summary counts.
47    pub summary: CycleSummary,
48}
49
50/// Summary statistics for the cycle.
51#[derive(Debug, Clone)]
52pub struct CycleSummary {
53    pub candidates_found: usize,
54    pub drafts_generated: usize,
55    pub drafts_failed: usize,
56    pub replies_queued: usize,
57    pub replies_executed: usize,
58    pub replies_blocked: usize,
59}
60
61/// Run a complete discovery cycle: discover → draft → queue.
62///
63/// This is the canonical entrypoint for autopilot loops. It composes the
64/// three atomic steps in sequence, passing outputs forward.
65///
66/// The cycle is deterministic: given the same inputs and external state,
67/// it produces the same outputs. Side effects (DB writes, X API calls)
68/// are explicit and auditable through the step functions.
69pub async fn run_discovery_cycle(
70    db: &DbPool,
71    x_client: &dyn XApiClient,
72    llm: &Arc<dyn LlmProvider>,
73    config: &Config,
74    input: CycleInput,
75) -> Result<CycleReport, WorkflowError> {
76    // Step 1: Discover — search, score, rank
77    let discover_output = discover::execute(
78        db,
79        x_client,
80        config,
81        DiscoverInput {
82            query: input.query,
83            min_score: input.min_score,
84            limit: input.limit,
85            since_id: input.since_id,
86        },
87    )
88    .await?;
89
90    if discover_output.candidates.is_empty() {
91        return Ok(CycleReport {
92            query_used: discover_output.query_used,
93            discovered: vec![],
94            drafts: vec![],
95            queued: vec![],
96            summary: CycleSummary {
97                candidates_found: 0,
98                drafts_generated: 0,
99                drafts_failed: 0,
100                replies_queued: 0,
101                replies_executed: 0,
102                replies_blocked: 0,
103            },
104        });
105    }
106
107    // Filter to actionable candidates (skip already-replied and low-action)
108    let actionable_ids: Vec<String> = discover_output
109        .candidates
110        .iter()
111        .filter(|c| !c.already_replied && c.recommended_action != "skip")
112        .map(|c| c.tweet_id.clone())
113        .collect();
114
115    let candidates_found = discover_output.candidates.len();
116
117    if actionable_ids.is_empty() {
118        return Ok(CycleReport {
119            query_used: discover_output.query_used,
120            discovered: discover_output.candidates,
121            drafts: vec![],
122            queued: vec![],
123            summary: CycleSummary {
124                candidates_found,
125                drafts_generated: 0,
126                drafts_failed: 0,
127                replies_queued: 0,
128                replies_executed: 0,
129                replies_blocked: 0,
130            },
131        });
132    }
133
134    // Step 2: Draft — generate LLM replies for actionable candidates
135    let drafts = draft::execute(
136        db,
137        llm,
138        config,
139        DraftInput {
140            candidate_ids: actionable_ids,
141            archetype: None,
142            mention_product: input.mention_product,
143            account_id: None,
144        },
145    )
146    .await?;
147
148    // Collect successful drafts as queue items
149    let queue_items: Vec<QueueItem> = drafts
150        .iter()
151        .filter_map(|d| match d {
152            DraftResult::Success {
153                candidate_id,
154                draft_text,
155                ..
156            } => Some(QueueItem {
157                candidate_id: candidate_id.clone(),
158                pre_drafted_text: Some(draft_text.clone()),
159            }),
160            DraftResult::Error { .. } => None,
161        })
162        .collect();
163
164    let drafts_generated = queue_items.len();
165    let drafts_failed = drafts.len() - drafts_generated;
166
167    // Step 3: Queue — safety check + route to approval or execute
168    let queued = if queue_items.is_empty() {
169        vec![]
170    } else {
171        queue::execute(
172            db,
173            Some(x_client),
174            Some(llm),
175            config,
176            QueueInput {
177                items: queue_items,
178                mention_product: input.mention_product,
179            },
180        )
181        .await?
182    };
183
184    // Count results
185    let replies_queued = queued
186        .iter()
187        .filter(|r| matches!(r, ProposeResult::Queued { .. }))
188        .count();
189    let replies_executed = queued
190        .iter()
191        .filter(|r| matches!(r, ProposeResult::Executed { .. }))
192        .count();
193    let replies_blocked = queued
194        .iter()
195        .filter(|r| matches!(r, ProposeResult::Blocked { .. }))
196        .count();
197
198    Ok(CycleReport {
199        query_used: discover_output.query_used,
200        discovered: discover_output.candidates,
201        drafts,
202        queued,
203        summary: CycleSummary {
204            candidates_found,
205            drafts_generated,
206            drafts_failed,
207            replies_queued,
208            replies_executed,
209            replies_blocked,
210        },
211    })
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217    use crate::workflow::ScoreBreakdown;
218
219    fn make_candidate(tweet_id: &str, already_replied: bool, action: &str) -> ScoredCandidate {
220        ScoredCandidate {
221            tweet_id: tweet_id.to_string(),
222            author_username: "user".to_string(),
223            author_followers: 100,
224            text: "test".to_string(),
225            created_at: "2026-01-01T00:00:00Z".to_string(),
226            score_total: 50.0,
227            score_breakdown: ScoreBreakdown {
228                keyword_relevance: 10.0,
229                follower: 10.0,
230                recency: 10.0,
231                engagement: 10.0,
232                reply_count: 5.0,
233                content_type: 5.0,
234            },
235            matched_keywords: vec!["test".to_string()],
236            recommended_action: action.to_string(),
237            already_replied,
238        }
239    }
240
241    // ── Actionable candidate filtering ──────────────────────────────
242
243    #[test]
244    fn filter_actionable_excludes_already_replied() {
245        let candidates = vec![
246            make_candidate("t1", false, "reply"),
247            make_candidate("t2", true, "reply"),
248            make_candidate("t3", false, "reply"),
249        ];
250
251        let actionable: Vec<String> = candidates
252            .iter()
253            .filter(|c| !c.already_replied && c.recommended_action != "skip")
254            .map(|c| c.tweet_id.clone())
255            .collect();
256
257        assert_eq!(actionable, vec!["t1", "t3"]);
258    }
259
260    #[test]
261    fn filter_actionable_excludes_skip_action() {
262        let candidates = vec![
263            make_candidate("t1", false, "reply"),
264            make_candidate("t2", false, "skip"),
265            make_candidate("t3", false, "like"),
266        ];
267
268        let actionable: Vec<String> = candidates
269            .iter()
270            .filter(|c| !c.already_replied && c.recommended_action != "skip")
271            .map(|c| c.tweet_id.clone())
272            .collect();
273
274        assert_eq!(actionable, vec!["t1", "t3"]);
275    }
276
277    #[test]
278    fn filter_actionable_all_replied_returns_empty() {
279        let candidates = vec![
280            make_candidate("t1", true, "reply"),
281            make_candidate("t2", true, "reply"),
282        ];
283
284        let actionable: Vec<String> = candidates
285            .iter()
286            .filter(|c| !c.already_replied && c.recommended_action != "skip")
287            .map(|c| c.tweet_id.clone())
288            .collect();
289
290        assert!(actionable.is_empty());
291    }
292
293    #[test]
294    fn filter_actionable_all_skip_returns_empty() {
295        let candidates = vec![
296            make_candidate("t1", false, "skip"),
297            make_candidate("t2", false, "skip"),
298        ];
299
300        let actionable: Vec<String> = candidates
301            .iter()
302            .filter(|c| !c.already_replied && c.recommended_action != "skip")
303            .map(|c| c.tweet_id.clone())
304            .collect();
305
306        assert!(actionable.is_empty());
307    }
308
309    // ── Queue item collection from drafts ───────────────────────────
310
311    #[test]
312    fn collect_queue_items_from_drafts() {
313        let drafts = vec![
314            DraftResult::Success {
315                candidate_id: "t1".to_string(),
316                draft_text: "Great point!".to_string(),
317                archetype: "agree_and_expand".to_string(),
318                char_count: 12,
319                confidence: "high".to_string(),
320                risks: vec![],
321                vault_citations: vec![],
322            },
323            DraftResult::Error {
324                candidate_id: "t2".to_string(),
325                error_code: "llm_error".to_string(),
326                error_message: "timeout".to_string(),
327            },
328            DraftResult::Success {
329                candidate_id: "t3".to_string(),
330                draft_text: "Interesting!".to_string(),
331                archetype: "ask_question".to_string(),
332                char_count: 12,
333                confidence: "medium".to_string(),
334                risks: vec![],
335                vault_citations: vec![],
336            },
337        ];
338
339        let queue_items: Vec<QueueItem> = drafts
340            .iter()
341            .filter_map(|d| match d {
342                DraftResult::Success {
343                    candidate_id,
344                    draft_text,
345                    ..
346                } => Some(QueueItem {
347                    candidate_id: candidate_id.clone(),
348                    pre_drafted_text: Some(draft_text.clone()),
349                }),
350                DraftResult::Error { .. } => None,
351            })
352            .collect();
353
354        assert_eq!(queue_items.len(), 2);
355        assert_eq!(queue_items[0].candidate_id, "t1");
356        assert_eq!(
357            queue_items[0].pre_drafted_text.as_deref(),
358            Some("Great point!")
359        );
360        assert_eq!(queue_items[1].candidate_id, "t3");
361
362        let drafts_generated = queue_items.len();
363        let drafts_failed = drafts.len() - drafts_generated;
364        assert_eq!(drafts_generated, 2);
365        assert_eq!(drafts_failed, 1);
366    }
367
368    // ── Summary counting from ProposeResult ─────────────────────────
369
370    #[test]
371    fn count_propose_results() {
372        let results = vec![
373            ProposeResult::Queued {
374                candidate_id: "t1".to_string(),
375                approval_queue_id: 1,
376            },
377            ProposeResult::Executed {
378                candidate_id: "t2".to_string(),
379                reply_tweet_id: "r1".to_string(),
380            },
381            ProposeResult::Blocked {
382                candidate_id: "t3".to_string(),
383                reason: "rate limit".to_string(),
384            },
385            ProposeResult::Queued {
386                candidate_id: "t4".to_string(),
387                approval_queue_id: 2,
388            },
389        ];
390
391        let queued = results
392            .iter()
393            .filter(|r| matches!(r, ProposeResult::Queued { .. }))
394            .count();
395        let executed = results
396            .iter()
397            .filter(|r| matches!(r, ProposeResult::Executed { .. }))
398            .count();
399        let blocked = results
400            .iter()
401            .filter(|r| matches!(r, ProposeResult::Blocked { .. }))
402            .count();
403
404        assert_eq!(queued, 2);
405        assert_eq!(executed, 1);
406        assert_eq!(blocked, 1);
407    }
408
409    // ── CycleInput construction ─────────────────────────────────────
410
411    #[test]
412    fn cycle_input_defaults() {
413        let input = CycleInput {
414            query: None,
415            min_score: None,
416            limit: None,
417            since_id: None,
418            mention_product: false,
419        };
420        assert!(input.query.is_none());
421        assert!(!input.mention_product);
422    }
423
424    #[test]
425    fn cycle_input_with_all_fields() {
426        let input = CycleInput {
427            query: Some("rust async".to_string()),
428            min_score: Some(50.0),
429            limit: Some(20),
430            since_id: Some("12345".to_string()),
431            mention_product: true,
432        };
433        assert_eq!(input.query.as_deref(), Some("rust async"));
434        assert_eq!(input.min_score, Some(50.0));
435        assert_eq!(input.limit, Some(20));
436        assert!(input.mention_product);
437    }
438
439    // ── CycleSummary zeroed report ──────────────────────────────────
440
441    #[test]
442    fn cycle_summary_empty() {
443        let summary = CycleSummary {
444            candidates_found: 0,
445            drafts_generated: 0,
446            drafts_failed: 0,
447            replies_queued: 0,
448            replies_executed: 0,
449            replies_blocked: 0,
450        };
451        assert_eq!(summary.candidates_found, 0);
452        assert_eq!(summary.drafts_generated, 0);
453        assert_eq!(summary.replies_queued, 0);
454    }
455
456    // ── CycleReport with no candidates ──────────────────────────────
457
458    #[test]
459    fn cycle_report_empty_candidates() {
460        let report = CycleReport {
461            query_used: "rust".to_string(),
462            discovered: vec![],
463            drafts: vec![],
464            queued: vec![],
465            summary: CycleSummary {
466                candidates_found: 0,
467                drafts_generated: 0,
468                drafts_failed: 0,
469                replies_queued: 0,
470                replies_executed: 0,
471                replies_blocked: 0,
472            },
473        };
474        assert_eq!(report.query_used, "rust");
475        assert!(report.discovered.is_empty());
476        assert!(report.drafts.is_empty());
477        assert!(report.queued.is_empty());
478    }
479}