1pub mod adapters;
18pub mod analytics_loop;
19pub mod approval_poster;
20pub mod circuit_breaker;
21pub mod content_loop;
22pub mod discovery_loop;
23pub mod loop_helpers;
24pub mod mentions_loop;
25pub mod posting_queue;
26#[cfg(test)]
27mod safety_guardrails_tests; pub mod schedule;
29pub mod scheduler;
30pub mod seed_worker;
31pub mod status_reporter;
32pub mod target_loop;
33pub mod thread_loop;
34pub mod watchtower;
35
36pub use analytics_loop::{
37 AnalyticsError, AnalyticsLoop, AnalyticsStorage, AnalyticsSummary, EngagementFetcher,
38 ProfileFetcher, ProfileMetrics, TweetMetrics,
39};
40pub use approval_poster::run_approval_poster;
41pub use content_loop::{ContentLoop, ContentResult};
42pub use discovery_loop::{DiscoveryLoop, DiscoveryResult, DiscoverySummary};
43pub use loop_helpers::{
44 ConsecutiveErrorTracker, ContentLoopError, ContentSafety, ContentStorage, LoopError,
45 LoopStorage, LoopTweet, MentionsFetcher, PostSender, ReplyGenerator, SafetyChecker,
46 ScoreResult, ThreadPoster, TopicScorer, TweetGenerator, TweetScorer, TweetSearcher,
47};
48pub use mentions_loop::{MentionResult, MentionsLoop};
49pub use posting_queue::{
50 create_posting_queue, run_posting_queue, run_posting_queue_with_approval, ApprovalQueue,
51 PostAction, PostExecutor, QUEUE_CAPACITY,
52};
53pub use schedule::{schedule_gate, ActiveSchedule};
54pub use scheduler::{scheduler_from_config, LoopScheduler};
55pub use seed_worker::SeedWorker;
56pub use status_reporter::{ActionCounts, StatusQuerier};
57pub use target_loop::{
58 TargetLoop, TargetLoopConfig, TargetResult, TargetStorage, TargetTweetFetcher,
59 TargetUserManager,
60};
61pub use thread_loop::{ThreadGenerator, ThreadLoop, ThreadResult};
62pub use watchtower::{IngestSummary, WatchtowerError, WatchtowerLoop};
63
64use std::future::Future;
65use std::sync::Arc;
66use std::time::Duration;
67use tokio::task::JoinHandle;
68use tokio_util::sync::CancellationToken;
69
70use crate::x_api::auth::TokenManager;
71use crate::x_api::XApiHttpClient;
72
73pub async fn run_token_refresh_loop(
80 token_manager: Arc<TokenManager>,
81 x_client: Arc<XApiHttpClient>,
82 cancel: CancellationToken,
83) {
84 let interval = Duration::from_secs(60);
85 loop {
86 tokio::select! {
87 () = cancel.cancelled() => {
88 tracing::debug!("Token refresh loop cancelled");
89 return;
90 }
91 () = tokio::time::sleep(interval) => {}
92 }
93
94 match token_manager.refresh_if_needed().await {
95 Ok(()) => {
96 let token = token_manager
98 .tokens_lock()
99 .read()
100 .await
101 .access_token
102 .clone();
103 x_client.set_access_token(token).await;
104 }
105 Err(crate::error::XApiError::AuthExpired) => {
106 tracing::error!(
107 "Token refresh failed: authentication expired. \
108 Run `tuitbot auth` to re-authenticate. Shutting down."
109 );
110 cancel.cancel();
111 return;
112 }
113 Err(e) => {
114 tracing::warn!(error = %e, "Token refresh attempt failed, will retry next cycle");
115 }
116 }
117 }
118}
119
120pub struct Runtime {
127 cancel: CancellationToken,
128 handles: Vec<(String, JoinHandle<()>)>,
129}
130
131impl Runtime {
132 pub fn new() -> Self {
134 Self {
135 cancel: CancellationToken::new(),
136 handles: Vec::new(),
137 }
138 }
139
140 pub fn cancel_token(&self) -> CancellationToken {
142 self.cancel.clone()
143 }
144
145 pub fn spawn<F>(&mut self, name: impl Into<String>, future: F)
151 where
152 F: Future<Output = ()> + Send + 'static,
153 {
154 let name = name.into();
155 tracing::info!(task = %name, "Spawning automation task");
156 let handle = tokio::spawn(future);
157 self.handles.push((name, handle));
158 }
159
160 pub fn task_count(&self) -> usize {
162 self.handles.len()
163 }
164
165 pub async fn shutdown(&mut self) {
171 tracing::info!("Initiating graceful shutdown");
172 self.cancel.cancel();
173
174 let timeout_duration = Duration::from_secs(30);
175 let handles: Vec<_> = self.handles.drain(..).collect();
176
177 let shutdown = async {
178 for (name, handle) in handles {
179 match handle.await {
180 Ok(()) => tracing::info!(task = %name, "Task completed cleanly"),
181 Err(e) => {
182 tracing::warn!(task = %name, error = %e, "Task panicked during shutdown")
183 }
184 }
185 }
186 };
187
188 if tokio::time::timeout(timeout_duration, shutdown)
189 .await
190 .is_err()
191 {
192 tracing::warn!("Shutdown timeout exceeded (30s), some tasks may still be running");
193 } else {
194 tracing::info!("Graceful shutdown complete");
195 }
196 }
197
198 pub async fn run_until_shutdown(mut self) {
205 wait_for_shutdown_signal().await;
206 self.shutdown().await;
207 }
208}
209
210impl Default for Runtime {
211 fn default() -> Self {
212 Self::new()
213 }
214}
215
216pub async fn wait_for_shutdown_signal() {
221 #[cfg(unix)]
222 {
223 use tokio::signal::unix::{signal, SignalKind};
224
225 let mut sigterm = match signal(SignalKind::terminate()) {
226 Ok(s) => s,
227 Err(e) => {
228 tracing::warn!(
229 error = %e,
230 "Failed to register SIGTERM handler, using Ctrl+C only"
231 );
232 if let Err(e) = tokio::signal::ctrl_c().await {
233 tracing::error!(error = %e, "Failed to listen for Ctrl+C");
234 } else {
235 tracing::info!("Received Ctrl+C");
236 }
237 return;
238 }
239 };
240
241 tokio::select! {
242 result = tokio::signal::ctrl_c() => {
243 if let Err(e) = result {
244 tracing::error!(error = %e, "Ctrl+C handler error");
245 }
246 tracing::info!("Received Ctrl+C");
247 }
248 _ = sigterm.recv() => {
249 tracing::info!("Received SIGTERM");
250 }
251 }
252 }
253
254 #[cfg(not(unix))]
255 {
256 if let Err(e) = tokio::signal::ctrl_c().await {
257 tracing::error!(error = %e, "Failed to listen for Ctrl+C");
258 } else {
259 tracing::info!("Received Ctrl+C");
260 }
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267 use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
268 use std::sync::Arc;
269
270 #[tokio::test]
271 async fn spawn_and_cancel() {
272 let mut runtime = Runtime::new();
273 let cancel = runtime.cancel_token();
274 let ran = Arc::new(AtomicBool::new(false));
275
276 let ran_clone = ran.clone();
277 runtime.spawn("test-task", async move {
278 ran_clone.store(true, Ordering::SeqCst);
279 cancel.cancelled().await;
280 });
281
282 assert_eq!(runtime.task_count(), 1);
283
284 tokio::time::sleep(Duration::from_millis(20)).await;
286 assert!(ran.load(Ordering::SeqCst));
287
288 runtime.shutdown().await;
289 assert_eq!(runtime.task_count(), 0);
290 }
291
292 #[tokio::test]
293 async fn multiple_tasks_all_stopped() {
294 let mut runtime = Runtime::new();
295 let counter = Arc::new(AtomicU32::new(0));
296
297 for i in 0..5 {
298 let cancel = runtime.cancel_token();
299 let counter_clone = counter.clone();
300 runtime.spawn(format!("task-{i}"), async move {
301 counter_clone.fetch_add(1, Ordering::SeqCst);
302 cancel.cancelled().await;
303 });
304 }
305
306 assert_eq!(runtime.task_count(), 5);
307
308 tokio::time::sleep(Duration::from_millis(50)).await;
310 assert_eq!(counter.load(Ordering::SeqCst), 5);
311
312 runtime.shutdown().await;
313 assert_eq!(runtime.task_count(), 0);
314 }
315
316 #[tokio::test]
317 async fn shutdown_completes_quickly_for_fast_tasks() {
318 let mut runtime = Runtime::new();
319 let cancel = runtime.cancel_token();
320
321 runtime.spawn("quick-task", async move {
322 cancel.cancelled().await;
323 tokio::time::sleep(Duration::from_millis(10)).await;
325 });
326
327 let start = tokio::time::Instant::now();
328 runtime.shutdown().await;
329 let elapsed = start.elapsed();
330
331 assert!(elapsed < Duration::from_secs(1));
333 }
334
335 #[tokio::test]
336 async fn shutdown_handles_already_completed_tasks() {
337 let mut runtime = Runtime::new();
338
339 runtime.spawn("instant-task", async {
340 });
342
343 tokio::time::sleep(Duration::from_millis(20)).await;
345
346 runtime.shutdown().await;
348 }
349
350 #[tokio::test]
351 async fn cancel_token_is_shared() {
352 let runtime = Runtime::new();
353 let t1 = runtime.cancel_token();
354 let t2 = runtime.cancel_token();
355
356 assert!(!t1.is_cancelled());
357 assert!(!t2.is_cancelled());
358
359 t1.cancel();
360
361 assert!(t1.is_cancelled());
362 assert!(t2.is_cancelled());
363 }
364
365 #[tokio::test]
366 async fn default_impl() {
367 let runtime = Runtime::default();
368 assert_eq!(runtime.task_count(), 0);
369 assert!(!runtime.cancel_token().is_cancelled());
370 }
371}