1pub mod adapters;
18pub mod analytics_loop;
19pub mod approval_poster;
20pub mod circuit_breaker;
21pub mod content_loop;
22pub mod discovery_loop;
23pub mod loop_helpers;
24pub mod mentions_loop;
25pub mod posting_queue;
26pub mod schedule;
27pub mod scheduler;
28pub mod seed_worker;
29pub mod status_reporter;
30pub mod target_loop;
31pub mod thread_loop;
32pub mod watchtower;
33
34pub use analytics_loop::{
35 AnalyticsError, AnalyticsLoop, AnalyticsStorage, AnalyticsSummary, EngagementFetcher,
36 ProfileFetcher, ProfileMetrics, TweetMetrics,
37};
38pub use approval_poster::run_approval_poster;
39pub use content_loop::{ContentLoop, ContentResult};
40pub use discovery_loop::{DiscoveryLoop, DiscoveryResult, DiscoverySummary};
41pub use loop_helpers::{
42 ConsecutiveErrorTracker, ContentLoopError, ContentSafety, ContentStorage, LoopError,
43 LoopStorage, LoopTweet, MentionsFetcher, PostSender, ReplyGenerator, SafetyChecker,
44 ScoreResult, ThreadPoster, TopicScorer, TweetGenerator, TweetScorer, TweetSearcher,
45};
46pub use mentions_loop::{MentionResult, MentionsLoop};
47pub use posting_queue::{
48 create_posting_queue, run_posting_queue_with_approval, ApprovalQueue, PostAction, PostExecutor,
49 QUEUE_CAPACITY,
50};
51pub use schedule::{schedule_gate, ActiveSchedule};
52pub use scheduler::{scheduler_from_config, LoopScheduler};
53pub use seed_worker::SeedWorker;
54pub use status_reporter::{ActionCounts, StatusQuerier};
55pub use target_loop::{
56 TargetLoop, TargetLoopConfig, TargetResult, TargetStorage, TargetTweetFetcher,
57 TargetUserManager,
58};
59pub use thread_loop::{ThreadGenerator, ThreadLoop, ThreadResult};
60pub use watchtower::{IngestSummary, WatchtowerError, WatchtowerLoop};
61
62use std::future::Future;
63use std::sync::Arc;
64use std::time::Duration;
65use tokio::task::JoinHandle;
66use tokio_util::sync::CancellationToken;
67
68use crate::x_api::auth::TokenManager;
69use crate::x_api::XApiHttpClient;
70
71pub async fn run_token_refresh_loop(
78 token_manager: Arc<TokenManager>,
79 x_client: Arc<XApiHttpClient>,
80 cancel: CancellationToken,
81) {
82 let interval = Duration::from_secs(60);
83 loop {
84 tokio::select! {
85 () = cancel.cancelled() => {
86 tracing::debug!("Token refresh loop cancelled");
87 return;
88 }
89 () = tokio::time::sleep(interval) => {}
90 }
91
92 match token_manager.refresh_if_needed().await {
93 Ok(()) => {
94 let token = token_manager
96 .tokens_lock()
97 .read()
98 .await
99 .access_token
100 .clone();
101 x_client.set_access_token(token).await;
102 }
103 Err(crate::error::XApiError::AuthExpired) => {
104 tracing::error!(
105 "Token refresh failed: authentication expired. \
106 Run `tuitbot auth` to re-authenticate. Shutting down."
107 );
108 cancel.cancel();
109 return;
110 }
111 Err(e) => {
112 tracing::warn!(error = %e, "Token refresh attempt failed, will retry next cycle");
113 }
114 }
115 }
116}
117
118pub struct Runtime {
125 cancel: CancellationToken,
126 handles: Vec<(String, JoinHandle<()>)>,
127}
128
129impl Runtime {
130 pub fn new() -> Self {
132 Self {
133 cancel: CancellationToken::new(),
134 handles: Vec::new(),
135 }
136 }
137
138 pub fn cancel_token(&self) -> CancellationToken {
140 self.cancel.clone()
141 }
142
143 pub fn spawn<F>(&mut self, name: impl Into<String>, future: F)
149 where
150 F: Future<Output = ()> + Send + 'static,
151 {
152 let name = name.into();
153 tracing::info!(task = %name, "Spawning automation task");
154 let handle = tokio::spawn(future);
155 self.handles.push((name, handle));
156 }
157
158 pub fn task_count(&self) -> usize {
160 self.handles.len()
161 }
162
163 pub async fn shutdown(&mut self) {
169 tracing::info!("Initiating graceful shutdown");
170 self.cancel.cancel();
171
172 let timeout_duration = Duration::from_secs(30);
173 let handles: Vec<_> = self.handles.drain(..).collect();
174
175 let shutdown = async {
176 for (name, handle) in handles {
177 match handle.await {
178 Ok(()) => tracing::info!(task = %name, "Task completed cleanly"),
179 Err(e) => {
180 tracing::warn!(task = %name, error = %e, "Task panicked during shutdown")
181 }
182 }
183 }
184 };
185
186 if tokio::time::timeout(timeout_duration, shutdown)
187 .await
188 .is_err()
189 {
190 tracing::warn!("Shutdown timeout exceeded (30s), some tasks may still be running");
191 } else {
192 tracing::info!("Graceful shutdown complete");
193 }
194 }
195
196 pub async fn run_until_shutdown(mut self) {
203 wait_for_shutdown_signal().await;
204 self.shutdown().await;
205 }
206}
207
208impl Default for Runtime {
209 fn default() -> Self {
210 Self::new()
211 }
212}
213
214pub async fn wait_for_shutdown_signal() {
219 #[cfg(unix)]
220 {
221 use tokio::signal::unix::{signal, SignalKind};
222
223 let mut sigterm = match signal(SignalKind::terminate()) {
224 Ok(s) => s,
225 Err(e) => {
226 tracing::warn!(
227 error = %e,
228 "Failed to register SIGTERM handler, using Ctrl+C only"
229 );
230 if let Err(e) = tokio::signal::ctrl_c().await {
231 tracing::error!(error = %e, "Failed to listen for Ctrl+C");
232 } else {
233 tracing::info!("Received Ctrl+C");
234 }
235 return;
236 }
237 };
238
239 tokio::select! {
240 result = tokio::signal::ctrl_c() => {
241 if let Err(e) = result {
242 tracing::error!(error = %e, "Ctrl+C handler error");
243 }
244 tracing::info!("Received Ctrl+C");
245 }
246 _ = sigterm.recv() => {
247 tracing::info!("Received SIGTERM");
248 }
249 }
250 }
251
252 #[cfg(not(unix))]
253 {
254 if let Err(e) = tokio::signal::ctrl_c().await {
255 tracing::error!(error = %e, "Failed to listen for Ctrl+C");
256 } else {
257 tracing::info!("Received Ctrl+C");
258 }
259 }
260}
261
262#[cfg(test)]
263mod tests {
264 use super::*;
265 use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
266 use std::sync::Arc;
267
268 #[tokio::test]
269 async fn spawn_and_cancel() {
270 let mut runtime = Runtime::new();
271 let cancel = runtime.cancel_token();
272 let ran = Arc::new(AtomicBool::new(false));
273
274 let ran_clone = ran.clone();
275 runtime.spawn("test-task", async move {
276 ran_clone.store(true, Ordering::SeqCst);
277 cancel.cancelled().await;
278 });
279
280 assert_eq!(runtime.task_count(), 1);
281
282 tokio::time::sleep(Duration::from_millis(20)).await;
284 assert!(ran.load(Ordering::SeqCst));
285
286 runtime.shutdown().await;
287 assert_eq!(runtime.task_count(), 0);
288 }
289
290 #[tokio::test]
291 async fn multiple_tasks_all_stopped() {
292 let mut runtime = Runtime::new();
293 let counter = Arc::new(AtomicU32::new(0));
294
295 for i in 0..5 {
296 let cancel = runtime.cancel_token();
297 let counter_clone = counter.clone();
298 runtime.spawn(format!("task-{i}"), async move {
299 counter_clone.fetch_add(1, Ordering::SeqCst);
300 cancel.cancelled().await;
301 });
302 }
303
304 assert_eq!(runtime.task_count(), 5);
305
306 tokio::time::sleep(Duration::from_millis(50)).await;
308 assert_eq!(counter.load(Ordering::SeqCst), 5);
309
310 runtime.shutdown().await;
311 assert_eq!(runtime.task_count(), 0);
312 }
313
314 #[tokio::test]
315 async fn shutdown_completes_quickly_for_fast_tasks() {
316 let mut runtime = Runtime::new();
317 let cancel = runtime.cancel_token();
318
319 runtime.spawn("quick-task", async move {
320 cancel.cancelled().await;
321 tokio::time::sleep(Duration::from_millis(10)).await;
323 });
324
325 let start = tokio::time::Instant::now();
326 runtime.shutdown().await;
327 let elapsed = start.elapsed();
328
329 assert!(elapsed < Duration::from_secs(1));
331 }
332
333 #[tokio::test]
334 async fn shutdown_handles_already_completed_tasks() {
335 let mut runtime = Runtime::new();
336
337 runtime.spawn("instant-task", async {
338 });
340
341 tokio::time::sleep(Duration::from_millis(20)).await;
343
344 runtime.shutdown().await;
346 }
347
348 #[tokio::test]
349 async fn cancel_token_is_shared() {
350 let runtime = Runtime::new();
351 let t1 = runtime.cancel_token();
352 let t2 = runtime.cancel_token();
353
354 assert!(!t1.is_cancelled());
355 assert!(!t2.is_cancelled());
356
357 t1.cancel();
358
359 assert!(t1.is_cancelled());
360 assert!(t2.is_cancelled());
361 }
362
363 #[tokio::test]
364 async fn default_impl() {
365 let runtime = Runtime::default();
366 assert_eq!(runtime.task_count(), 0);
367 assert!(!runtime.cancel_token().is_cancelled());
368 }
369}