Skip to main content

tuitbot_core/automation/
mod.rs

1//! Automation runtime, engagement loops, and content generation.
2//!
3//! This module contains the automation runtime for managing concurrent task
4//! lifecycles, the core engagement engine (tweet discovery and mention replies),
5//! and original content generation (educational tweets and threads).
6//!
7//! Submodules:
8//! - [`scheduler`]: Loop scheduler with configurable interval and jitter.
9//! - [`posting_queue`]: Serialized posting queue for concurrent loops.
10//! - [`status_reporter`]: Periodic action count summaries.
11//! - [`loop_helpers`]: Shared types, traits, and error handling for loops.
12//! - [`mentions_loop`]: Monitors @-mentions and generates replies.
13//! - [`discovery_loop`]: Searches tweets by keyword, scores, and replies.
14//! - [`content_loop`]: Generates and posts educational tweets.
15//! - [`thread_loop`]: Generates and posts multi-tweet threads.
16
17pub mod adapters;
18pub mod analytics_loop;
19pub mod approval_poster;
20pub mod circuit_breaker;
21pub mod content_loop;
22pub mod discovery_loop;
23pub mod loop_helpers;
24pub mod mentions_loop;
25pub mod posting_queue;
26#[cfg(test)]
27mod safety_guardrails_tests; // Task 3.5: production-limits safety guardrail tests
28pub mod schedule;
29pub mod scheduler;
30pub mod seed_worker;
31pub mod status_reporter;
32pub mod target_loop;
33pub mod thread_loop;
34pub mod watchtower;
35
36pub use analytics_loop::{
37    AnalyticsError, AnalyticsLoop, AnalyticsStorage, AnalyticsSummary, EngagementFetcher,
38    ProfileFetcher, ProfileMetrics, TweetMetrics,
39};
40pub use approval_poster::run_approval_poster;
41pub use content_loop::{ContentLoop, ContentResult};
42pub use discovery_loop::{DiscoveryLoop, DiscoveryResult, DiscoverySummary};
43pub use loop_helpers::{
44    ConsecutiveErrorTracker, ContentLoopError, ContentSafety, ContentStorage, LoopError,
45    LoopStorage, LoopTweet, MentionsFetcher, PostSender, ReplyGenerator, SafetyChecker,
46    ScoreResult, ThreadPoster, TopicScorer, TweetGenerator, TweetScorer, TweetSearcher,
47};
48pub use mentions_loop::{MentionResult, MentionsLoop};
49pub use posting_queue::{
50    create_posting_queue, run_posting_queue, run_posting_queue_with_approval, ApprovalQueue,
51    PostAction, PostExecutor, QUEUE_CAPACITY,
52};
53pub use schedule::{schedule_gate, ActiveSchedule};
54pub use scheduler::{scheduler_from_config, LoopScheduler};
55pub use seed_worker::SeedWorker;
56pub use status_reporter::{ActionCounts, StatusQuerier};
57pub use target_loop::{
58    TargetLoop, TargetLoopConfig, TargetResult, TargetStorage, TargetTweetFetcher,
59    TargetUserManager,
60};
61pub use thread_loop::{ThreadGenerator, ThreadLoop, ThreadResult};
62pub use watchtower::{IngestSummary, WatchtowerError, WatchtowerLoop};
63
64use std::future::Future;
65use std::sync::Arc;
66use std::time::Duration;
67use tokio::task::JoinHandle;
68use tokio_util::sync::CancellationToken;
69
70use crate::x_api::auth::TokenManager;
71use crate::x_api::XApiHttpClient;
72
73/// Background loop that refreshes the X API OAuth token before it expires.
74///
75/// Checks every 60 seconds whether the token is within 5 minutes of expiry.
76/// On successful refresh, updates the `XApiHttpClient`'s bearer token.
77/// On `AuthExpired` error (refresh token revoked), cancels the runtime for
78/// graceful shutdown.
79pub async fn run_token_refresh_loop(
80    token_manager: Arc<TokenManager>,
81    x_client: Arc<XApiHttpClient>,
82    cancel: CancellationToken,
83) {
84    let interval = Duration::from_secs(60);
85    loop {
86        tokio::select! {
87            () = cancel.cancelled() => {
88                tracing::debug!("Token refresh loop cancelled");
89                return;
90            }
91            () = tokio::time::sleep(interval) => {}
92        }
93
94        match token_manager.refresh_if_needed().await {
95            Ok(()) => {
96                // Update the HTTP client's bearer token with whatever is current.
97                let token = token_manager
98                    .tokens_lock()
99                    .read()
100                    .await
101                    .access_token
102                    .clone();
103                x_client.set_access_token(token).await;
104            }
105            Err(crate::error::XApiError::AuthExpired) => {
106                tracing::error!(
107                    "Token refresh failed: authentication expired. \
108                     Run `tuitbot auth` to re-authenticate. Shutting down."
109                );
110                cancel.cancel();
111                return;
112            }
113            Err(e) => {
114                tracing::warn!(error = %e, "Token refresh attempt failed, will retry next cycle");
115            }
116        }
117    }
118}
119
120/// Automation runtime that manages concurrent task lifecycles.
121///
122/// The runtime owns a `CancellationToken` shared by all spawned tasks
123/// and collects their `JoinHandle`s for graceful shutdown. It does not
124/// own specific business dependencies -- those are passed when spawning
125/// individual loops.
126pub struct Runtime {
127    cancel: CancellationToken,
128    handles: Vec<(String, JoinHandle<()>)>,
129}
130
131impl Runtime {
132    /// Create a new runtime with a fresh cancellation token.
133    pub fn new() -> Self {
134        Self {
135            cancel: CancellationToken::new(),
136            handles: Vec::new(),
137        }
138    }
139
140    /// Return a clone of the cancellation token for passing to tasks.
141    pub fn cancel_token(&self) -> CancellationToken {
142        self.cancel.clone()
143    }
144
145    /// Spawn an automation task with a descriptive name.
146    ///
147    /// The task's `JoinHandle` is tracked for shutdown. The task should
148    /// check `CancellationToken::is_cancelled()` in its loop to exit
149    /// gracefully when shutdown is initiated.
150    pub fn spawn<F>(&mut self, name: impl Into<String>, future: F)
151    where
152        F: Future<Output = ()> + Send + 'static,
153    {
154        let name = name.into();
155        tracing::info!(task = %name, "Spawning automation task");
156        let handle = tokio::spawn(future);
157        self.handles.push((name, handle));
158    }
159
160    /// Return the number of spawned tasks.
161    pub fn task_count(&self) -> usize {
162        self.handles.len()
163    }
164
165    /// Initiate graceful shutdown.
166    ///
167    /// 1. Cancels the token, signaling all tasks to stop.
168    /// 2. Awaits all `JoinHandle`s with a 30-second timeout.
169    /// 3. If timeout is exceeded, logs a warning (caller decides whether to force-exit).
170    pub async fn shutdown(&mut self) {
171        tracing::info!("Initiating graceful shutdown");
172        self.cancel.cancel();
173
174        let timeout_duration = Duration::from_secs(30);
175        let handles: Vec<_> = self.handles.drain(..).collect();
176
177        let shutdown = async {
178            for (name, handle) in handles {
179                match handle.await {
180                    Ok(()) => tracing::info!(task = %name, "Task completed cleanly"),
181                    Err(e) => {
182                        tracing::warn!(task = %name, error = %e, "Task panicked during shutdown")
183                    }
184                }
185            }
186        };
187
188        if tokio::time::timeout(timeout_duration, shutdown)
189            .await
190            .is_err()
191        {
192            tracing::warn!("Shutdown timeout exceeded (30s), some tasks may still be running");
193        } else {
194            tracing::info!("Graceful shutdown complete");
195        }
196    }
197
198    /// Block until a shutdown signal is received, then gracefully stop all tasks.
199    ///
200    /// This is the typical entry point for the `tuitbot run` command:
201    /// 1. Spawn all tasks.
202    /// 2. Call `run_until_shutdown()` to block until Ctrl+C / SIGTERM.
203    /// 3. All tasks are stopped and awaited.
204    pub async fn run_until_shutdown(mut self) {
205        wait_for_shutdown_signal().await;
206        self.shutdown().await;
207    }
208}
209
210impl Default for Runtime {
211    fn default() -> Self {
212        Self::new()
213    }
214}
215
216/// Wait for an OS shutdown signal (Ctrl+C or SIGTERM).
217///
218/// On Unix, listens for both Ctrl+C and SIGTERM. On Windows, listens
219/// for Ctrl+C only (SIGTERM is not available).
220pub async fn wait_for_shutdown_signal() {
221    #[cfg(unix)]
222    {
223        use tokio::signal::unix::{signal, SignalKind};
224
225        let mut sigterm = match signal(SignalKind::terminate()) {
226            Ok(s) => s,
227            Err(e) => {
228                tracing::warn!(
229                    error = %e,
230                    "Failed to register SIGTERM handler, using Ctrl+C only"
231                );
232                if let Err(e) = tokio::signal::ctrl_c().await {
233                    tracing::error!(error = %e, "Failed to listen for Ctrl+C");
234                } else {
235                    tracing::info!("Received Ctrl+C");
236                }
237                return;
238            }
239        };
240
241        tokio::select! {
242            result = tokio::signal::ctrl_c() => {
243                if let Err(e) = result {
244                    tracing::error!(error = %e, "Ctrl+C handler error");
245                }
246                tracing::info!("Received Ctrl+C");
247            }
248            _ = sigterm.recv() => {
249                tracing::info!("Received SIGTERM");
250            }
251        }
252    }
253
254    #[cfg(not(unix))]
255    {
256        if let Err(e) = tokio::signal::ctrl_c().await {
257            tracing::error!(error = %e, "Failed to listen for Ctrl+C");
258        } else {
259            tracing::info!("Received Ctrl+C");
260        }
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267    use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
268    use std::sync::Arc;
269
270    #[tokio::test]
271    async fn spawn_and_cancel() {
272        let mut runtime = Runtime::new();
273        let cancel = runtime.cancel_token();
274        let ran = Arc::new(AtomicBool::new(false));
275
276        let ran_clone = ran.clone();
277        runtime.spawn("test-task", async move {
278            ran_clone.store(true, Ordering::SeqCst);
279            cancel.cancelled().await;
280        });
281
282        assert_eq!(runtime.task_count(), 1);
283
284        // Give task time to start
285        tokio::time::sleep(Duration::from_millis(20)).await;
286        assert!(ran.load(Ordering::SeqCst));
287
288        runtime.shutdown().await;
289        assert_eq!(runtime.task_count(), 0);
290    }
291
292    #[tokio::test]
293    async fn multiple_tasks_all_stopped() {
294        let mut runtime = Runtime::new();
295        let counter = Arc::new(AtomicU32::new(0));
296
297        for i in 0..5 {
298            let cancel = runtime.cancel_token();
299            let counter_clone = counter.clone();
300            runtime.spawn(format!("task-{i}"), async move {
301                counter_clone.fetch_add(1, Ordering::SeqCst);
302                cancel.cancelled().await;
303            });
304        }
305
306        assert_eq!(runtime.task_count(), 5);
307
308        // Let all tasks start
309        tokio::time::sleep(Duration::from_millis(50)).await;
310        assert_eq!(counter.load(Ordering::SeqCst), 5);
311
312        runtime.shutdown().await;
313        assert_eq!(runtime.task_count(), 0);
314    }
315
316    #[tokio::test]
317    async fn shutdown_completes_quickly_for_fast_tasks() {
318        let mut runtime = Runtime::new();
319        let cancel = runtime.cancel_token();
320
321        runtime.spawn("quick-task", async move {
322            cancel.cancelled().await;
323            // Simulate brief cleanup
324            tokio::time::sleep(Duration::from_millis(10)).await;
325        });
326
327        let start = tokio::time::Instant::now();
328        runtime.shutdown().await;
329        let elapsed = start.elapsed();
330
331        // Should complete well within the 30s timeout
332        assert!(elapsed < Duration::from_secs(1));
333    }
334
335    #[tokio::test]
336    async fn shutdown_handles_already_completed_tasks() {
337        let mut runtime = Runtime::new();
338
339        runtime.spawn("instant-task", async {
340            // Task that finishes immediately
341        });
342
343        // Let it finish
344        tokio::time::sleep(Duration::from_millis(20)).await;
345
346        // Shutdown should handle already-completed tasks gracefully
347        runtime.shutdown().await;
348    }
349
350    #[tokio::test]
351    async fn cancel_token_is_shared() {
352        let runtime = Runtime::new();
353        let t1 = runtime.cancel_token();
354        let t2 = runtime.cancel_token();
355
356        assert!(!t1.is_cancelled());
357        assert!(!t2.is_cancelled());
358
359        t1.cancel();
360
361        assert!(t1.is_cancelled());
362        assert!(t2.is_cancelled());
363    }
364
365    #[tokio::test]
366    async fn default_impl() {
367        let runtime = Runtime::default();
368        assert_eq!(runtime.task_count(), 0);
369        assert!(!runtime.cancel_token().is_cancelled());
370    }
371}