Skip to main content

tuitbot_core/automation/
mod.rs

1//! Automation runtime, engagement loops, and content generation.
2//!
3//! This module contains the automation runtime for managing concurrent task
4//! lifecycles, the core engagement engine (tweet discovery and mention replies),
5//! and original content generation (educational tweets and threads).
6//!
7//! Submodules:
8//! - [`scheduler`]: Loop scheduler with configurable interval and jitter.
9//! - [`posting_queue`]: Serialized posting queue for concurrent loops.
10//! - [`status_reporter`]: Periodic action count summaries.
11//! - [`loop_helpers`]: Shared types, traits, and error handling for loops.
12//! - [`mentions_loop`]: Monitors @-mentions and generates replies.
13//! - [`discovery_loop`]: Searches tweets by keyword, scores, and replies.
14//! - [`content_loop`]: Generates and posts educational tweets.
15//! - [`thread_loop`]: Generates and posts multi-tweet threads.
16
17pub mod adapters;
18pub mod analytics_loop;
19pub mod approval_poster;
20pub mod circuit_breaker;
21pub mod content_loop;
22pub mod discovery_loop;
23pub mod loop_helpers;
24pub mod mentions_loop;
25pub mod posting_queue;
26pub mod schedule;
27pub mod scheduler;
28pub mod seed_worker;
29pub mod status_reporter;
30pub mod target_loop;
31pub mod thread_loop;
32pub mod watchtower;
33
34pub use analytics_loop::{
35    AnalyticsError, AnalyticsLoop, AnalyticsStorage, AnalyticsSummary, EngagementFetcher,
36    ProfileFetcher, ProfileMetrics, TweetMetrics,
37};
38pub use approval_poster::run_approval_poster;
39pub use content_loop::{ContentLoop, ContentResult};
40pub use discovery_loop::{DiscoveryLoop, DiscoveryResult, DiscoverySummary};
41pub use loop_helpers::{
42    ConsecutiveErrorTracker, ContentLoopError, ContentSafety, ContentStorage, LoopError,
43    LoopStorage, LoopTweet, MentionsFetcher, PostSender, ReplyGenerator, SafetyChecker,
44    ScoreResult, ThreadPoster, TopicScorer, TweetGenerator, TweetScorer, TweetSearcher,
45};
46pub use mentions_loop::{MentionResult, MentionsLoop};
47pub use posting_queue::{
48    create_posting_queue, run_posting_queue_with_approval, ApprovalQueue, PostAction, PostExecutor,
49    QUEUE_CAPACITY,
50};
51pub use schedule::{schedule_gate, ActiveSchedule};
52pub use scheduler::{scheduler_from_config, LoopScheduler};
53pub use seed_worker::SeedWorker;
54pub use status_reporter::{ActionCounts, StatusQuerier};
55pub use target_loop::{
56    TargetLoop, TargetLoopConfig, TargetResult, TargetStorage, TargetTweetFetcher,
57    TargetUserManager,
58};
59pub use thread_loop::{ThreadGenerator, ThreadLoop, ThreadResult};
60pub use watchtower::{IngestSummary, WatchtowerError, WatchtowerLoop};
61
62use std::future::Future;
63use std::sync::Arc;
64use std::time::Duration;
65use tokio::task::JoinHandle;
66use tokio_util::sync::CancellationToken;
67
68use crate::x_api::auth::TokenManager;
69use crate::x_api::XApiHttpClient;
70
71/// Background loop that refreshes the X API OAuth token before it expires.
72///
73/// Checks every 60 seconds whether the token is within 5 minutes of expiry.
74/// On successful refresh, updates the `XApiHttpClient`'s bearer token.
75/// On `AuthExpired` error (refresh token revoked), cancels the runtime for
76/// graceful shutdown.
77pub async fn run_token_refresh_loop(
78    token_manager: Arc<TokenManager>,
79    x_client: Arc<XApiHttpClient>,
80    cancel: CancellationToken,
81) {
82    let interval = Duration::from_secs(60);
83    loop {
84        tokio::select! {
85            () = cancel.cancelled() => {
86                tracing::debug!("Token refresh loop cancelled");
87                return;
88            }
89            () = tokio::time::sleep(interval) => {}
90        }
91
92        match token_manager.refresh_if_needed().await {
93            Ok(()) => {
94                // Update the HTTP client's bearer token with whatever is current.
95                let token = token_manager
96                    .tokens_lock()
97                    .read()
98                    .await
99                    .access_token
100                    .clone();
101                x_client.set_access_token(token).await;
102            }
103            Err(crate::error::XApiError::AuthExpired) => {
104                tracing::error!(
105                    "Token refresh failed: authentication expired. \
106                     Run `tuitbot auth` to re-authenticate. Shutting down."
107                );
108                cancel.cancel();
109                return;
110            }
111            Err(e) => {
112                tracing::warn!(error = %e, "Token refresh attempt failed, will retry next cycle");
113            }
114        }
115    }
116}
117
118/// Automation runtime that manages concurrent task lifecycles.
119///
120/// The runtime owns a `CancellationToken` shared by all spawned tasks
121/// and collects their `JoinHandle`s for graceful shutdown. It does not
122/// own specific business dependencies -- those are passed when spawning
123/// individual loops.
124pub struct Runtime {
125    cancel: CancellationToken,
126    handles: Vec<(String, JoinHandle<()>)>,
127}
128
129impl Runtime {
130    /// Create a new runtime with a fresh cancellation token.
131    pub fn new() -> Self {
132        Self {
133            cancel: CancellationToken::new(),
134            handles: Vec::new(),
135        }
136    }
137
138    /// Return a clone of the cancellation token for passing to tasks.
139    pub fn cancel_token(&self) -> CancellationToken {
140        self.cancel.clone()
141    }
142
143    /// Spawn an automation task with a descriptive name.
144    ///
145    /// The task's `JoinHandle` is tracked for shutdown. The task should
146    /// check `CancellationToken::is_cancelled()` in its loop to exit
147    /// gracefully when shutdown is initiated.
148    pub fn spawn<F>(&mut self, name: impl Into<String>, future: F)
149    where
150        F: Future<Output = ()> + Send + 'static,
151    {
152        let name = name.into();
153        tracing::info!(task = %name, "Spawning automation task");
154        let handle = tokio::spawn(future);
155        self.handles.push((name, handle));
156    }
157
158    /// Return the number of spawned tasks.
159    pub fn task_count(&self) -> usize {
160        self.handles.len()
161    }
162
163    /// Initiate graceful shutdown.
164    ///
165    /// 1. Cancels the token, signaling all tasks to stop.
166    /// 2. Awaits all `JoinHandle`s with a 30-second timeout.
167    /// 3. If timeout is exceeded, logs a warning (caller decides whether to force-exit).
168    pub async fn shutdown(&mut self) {
169        tracing::info!("Initiating graceful shutdown");
170        self.cancel.cancel();
171
172        let timeout_duration = Duration::from_secs(30);
173        let handles: Vec<_> = self.handles.drain(..).collect();
174
175        let shutdown = async {
176            for (name, handle) in handles {
177                match handle.await {
178                    Ok(()) => tracing::info!(task = %name, "Task completed cleanly"),
179                    Err(e) => {
180                        tracing::warn!(task = %name, error = %e, "Task panicked during shutdown")
181                    }
182                }
183            }
184        };
185
186        if tokio::time::timeout(timeout_duration, shutdown)
187            .await
188            .is_err()
189        {
190            tracing::warn!("Shutdown timeout exceeded (30s), some tasks may still be running");
191        } else {
192            tracing::info!("Graceful shutdown complete");
193        }
194    }
195
196    /// Block until a shutdown signal is received, then gracefully stop all tasks.
197    ///
198    /// This is the typical entry point for the `tuitbot run` command:
199    /// 1. Spawn all tasks.
200    /// 2. Call `run_until_shutdown()` to block until Ctrl+C / SIGTERM.
201    /// 3. All tasks are stopped and awaited.
202    pub async fn run_until_shutdown(mut self) {
203        wait_for_shutdown_signal().await;
204        self.shutdown().await;
205    }
206}
207
208impl Default for Runtime {
209    fn default() -> Self {
210        Self::new()
211    }
212}
213
214/// Wait for an OS shutdown signal (Ctrl+C or SIGTERM).
215///
216/// On Unix, listens for both Ctrl+C and SIGTERM. On Windows, listens
217/// for Ctrl+C only (SIGTERM is not available).
218pub async fn wait_for_shutdown_signal() {
219    #[cfg(unix)]
220    {
221        use tokio::signal::unix::{signal, SignalKind};
222
223        let mut sigterm = match signal(SignalKind::terminate()) {
224            Ok(s) => s,
225            Err(e) => {
226                tracing::warn!(
227                    error = %e,
228                    "Failed to register SIGTERM handler, using Ctrl+C only"
229                );
230                if let Err(e) = tokio::signal::ctrl_c().await {
231                    tracing::error!(error = %e, "Failed to listen for Ctrl+C");
232                } else {
233                    tracing::info!("Received Ctrl+C");
234                }
235                return;
236            }
237        };
238
239        tokio::select! {
240            result = tokio::signal::ctrl_c() => {
241                if let Err(e) = result {
242                    tracing::error!(error = %e, "Ctrl+C handler error");
243                }
244                tracing::info!("Received Ctrl+C");
245            }
246            _ = sigterm.recv() => {
247                tracing::info!("Received SIGTERM");
248            }
249        }
250    }
251
252    #[cfg(not(unix))]
253    {
254        if let Err(e) = tokio::signal::ctrl_c().await {
255            tracing::error!(error = %e, "Failed to listen for Ctrl+C");
256        } else {
257            tracing::info!("Received Ctrl+C");
258        }
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265    use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
266    use std::sync::Arc;
267
268    #[tokio::test]
269    async fn spawn_and_cancel() {
270        let mut runtime = Runtime::new();
271        let cancel = runtime.cancel_token();
272        let ran = Arc::new(AtomicBool::new(false));
273
274        let ran_clone = ran.clone();
275        runtime.spawn("test-task", async move {
276            ran_clone.store(true, Ordering::SeqCst);
277            cancel.cancelled().await;
278        });
279
280        assert_eq!(runtime.task_count(), 1);
281
282        // Give task time to start
283        tokio::time::sleep(Duration::from_millis(20)).await;
284        assert!(ran.load(Ordering::SeqCst));
285
286        runtime.shutdown().await;
287        assert_eq!(runtime.task_count(), 0);
288    }
289
290    #[tokio::test]
291    async fn multiple_tasks_all_stopped() {
292        let mut runtime = Runtime::new();
293        let counter = Arc::new(AtomicU32::new(0));
294
295        for i in 0..5 {
296            let cancel = runtime.cancel_token();
297            let counter_clone = counter.clone();
298            runtime.spawn(format!("task-{i}"), async move {
299                counter_clone.fetch_add(1, Ordering::SeqCst);
300                cancel.cancelled().await;
301            });
302        }
303
304        assert_eq!(runtime.task_count(), 5);
305
306        // Let all tasks start
307        tokio::time::sleep(Duration::from_millis(50)).await;
308        assert_eq!(counter.load(Ordering::SeqCst), 5);
309
310        runtime.shutdown().await;
311        assert_eq!(runtime.task_count(), 0);
312    }
313
314    #[tokio::test]
315    async fn shutdown_completes_quickly_for_fast_tasks() {
316        let mut runtime = Runtime::new();
317        let cancel = runtime.cancel_token();
318
319        runtime.spawn("quick-task", async move {
320            cancel.cancelled().await;
321            // Simulate brief cleanup
322            tokio::time::sleep(Duration::from_millis(10)).await;
323        });
324
325        let start = tokio::time::Instant::now();
326        runtime.shutdown().await;
327        let elapsed = start.elapsed();
328
329        // Should complete well within the 30s timeout
330        assert!(elapsed < Duration::from_secs(1));
331    }
332
333    #[tokio::test]
334    async fn shutdown_handles_already_completed_tasks() {
335        let mut runtime = Runtime::new();
336
337        runtime.spawn("instant-task", async {
338            // Task that finishes immediately
339        });
340
341        // Let it finish
342        tokio::time::sleep(Duration::from_millis(20)).await;
343
344        // Shutdown should handle already-completed tasks gracefully
345        runtime.shutdown().await;
346    }
347
348    #[tokio::test]
349    async fn cancel_token_is_shared() {
350        let runtime = Runtime::new();
351        let t1 = runtime.cancel_token();
352        let t2 = runtime.cancel_token();
353
354        assert!(!t1.is_cancelled());
355        assert!(!t2.is_cancelled());
356
357        t1.cancel();
358
359        assert!(t1.is_cancelled());
360        assert!(t2.is_cancelled());
361    }
362
363    #[tokio::test]
364    async fn default_impl() {
365        let runtime = Runtime::default();
366        assert_eq!(runtime.task_count(), 0);
367        assert!(!runtime.cancel_token().is_cancelled());
368    }
369}