pub mod adapters;
pub mod analytics_loop;
pub mod approval_poster;
pub mod circuit_breaker;
pub mod content_loop;
pub mod discovery_loop;
pub mod loop_helpers;
pub mod mentions_loop;
pub mod posting_queue;
#[cfg(test)]
mod safety_guardrails_tests; pub mod schedule;
pub mod scheduler;
pub mod seed_worker;
pub mod status_reporter;
pub mod target_loop;
pub mod thread_loop;
pub mod watchtower;
pub use analytics_loop::{
AnalyticsError, AnalyticsLoop, AnalyticsStorage, AnalyticsSummary, EngagementFetcher,
ProfileFetcher, ProfileMetrics, TweetMetrics,
};
pub use approval_poster::run_approval_poster;
pub use content_loop::{ContentLoop, ContentResult};
pub use discovery_loop::{DiscoveryLoop, DiscoveryResult, DiscoverySummary};
pub use loop_helpers::{
ConsecutiveErrorTracker, ContentLoopError, ContentSafety, ContentStorage, LoopError,
LoopStorage, LoopTweet, MentionsFetcher, PostSender, ReplyGenerator, SafetyChecker,
ScoreResult, ThreadPoster, TopicScorer, TweetGenerator, TweetScorer, TweetSearcher,
};
pub use mentions_loop::{MentionResult, MentionsLoop};
pub use posting_queue::{
create_posting_queue, run_posting_queue, run_posting_queue_with_approval, ApprovalQueue,
PostAction, PostExecutor, QUEUE_CAPACITY,
};
pub use schedule::{schedule_gate, ActiveSchedule};
pub use scheduler::{scheduler_from_config, LoopScheduler};
pub use seed_worker::SeedWorker;
pub use status_reporter::{ActionCounts, StatusQuerier};
pub use target_loop::{
TargetLoop, TargetLoopConfig, TargetResult, TargetStorage, TargetTweetFetcher,
TargetUserManager,
};
pub use thread_loop::{ThreadGenerator, ThreadLoop, ThreadResult};
pub use watchtower::{IngestSummary, WatchtowerError, WatchtowerLoop};
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::x_api::auth::TokenManager;
use crate::x_api::XApiHttpClient;
pub async fn run_token_refresh_loop(
token_manager: Arc<TokenManager>,
x_client: Arc<XApiHttpClient>,
cancel: CancellationToken,
) {
let interval = Duration::from_secs(60);
loop {
tokio::select! {
() = cancel.cancelled() => {
tracing::debug!("Token refresh loop cancelled");
return;
}
() = tokio::time::sleep(interval) => {}
}
match token_manager.refresh_if_needed().await {
Ok(()) => {
let token = token_manager
.tokens_lock()
.read()
.await
.access_token
.clone();
x_client.set_access_token(token).await;
}
Err(crate::error::XApiError::AuthExpired) => {
tracing::error!(
"Token refresh failed: authentication expired. \
Run `tuitbot auth` to re-authenticate. Shutting down."
);
cancel.cancel();
return;
}
Err(e) => {
tracing::warn!(error = %e, "Token refresh attempt failed, will retry next cycle");
}
}
}
}
pub struct Runtime {
cancel: CancellationToken,
handles: Vec<(String, JoinHandle<()>)>,
}
impl Runtime {
pub fn new() -> Self {
Self {
cancel: CancellationToken::new(),
handles: Vec::new(),
}
}
pub fn cancel_token(&self) -> CancellationToken {
self.cancel.clone()
}
pub fn spawn<F>(&mut self, name: impl Into<String>, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
let name = name.into();
tracing::info!(task = %name, "Spawning automation task");
let handle = tokio::spawn(future);
self.handles.push((name, handle));
}
pub fn task_count(&self) -> usize {
self.handles.len()
}
pub async fn shutdown(&mut self) {
tracing::info!("Initiating graceful shutdown");
self.cancel.cancel();
let timeout_duration = Duration::from_secs(30);
let handles: Vec<_> = self.handles.drain(..).collect();
let shutdown = async {
for (name, handle) in handles {
match handle.await {
Ok(()) => tracing::info!(task = %name, "Task completed cleanly"),
Err(e) => {
tracing::warn!(task = %name, error = %e, "Task panicked during shutdown")
}
}
}
};
if tokio::time::timeout(timeout_duration, shutdown)
.await
.is_err()
{
tracing::warn!("Shutdown timeout exceeded (30s), some tasks may still be running");
} else {
tracing::info!("Graceful shutdown complete");
}
}
pub async fn run_until_shutdown(mut self) {
wait_for_shutdown_signal().await;
self.shutdown().await;
}
}
impl Default for Runtime {
fn default() -> Self {
Self::new()
}
}
pub async fn wait_for_shutdown_signal() {
#[cfg(unix)]
{
use tokio::signal::unix::{signal, SignalKind};
let mut sigterm = match signal(SignalKind::terminate()) {
Ok(s) => s,
Err(e) => {
tracing::warn!(
error = %e,
"Failed to register SIGTERM handler, using Ctrl+C only"
);
if let Err(e) = tokio::signal::ctrl_c().await {
tracing::error!(error = %e, "Failed to listen for Ctrl+C");
} else {
tracing::info!("Received Ctrl+C");
}
return;
}
};
tokio::select! {
result = tokio::signal::ctrl_c() => {
if let Err(e) = result {
tracing::error!(error = %e, "Ctrl+C handler error");
}
tracing::info!("Received Ctrl+C");
}
_ = sigterm.recv() => {
tracing::info!("Received SIGTERM");
}
}
}
#[cfg(not(unix))]
{
if let Err(e) = tokio::signal::ctrl_c().await {
tracing::error!(error = %e, "Failed to listen for Ctrl+C");
} else {
tracing::info!("Received Ctrl+C");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
#[tokio::test]
async fn spawn_and_cancel() {
let mut runtime = Runtime::new();
let cancel = runtime.cancel_token();
let ran = Arc::new(AtomicBool::new(false));
let ran_clone = ran.clone();
runtime.spawn("test-task", async move {
ran_clone.store(true, Ordering::SeqCst);
cancel.cancelled().await;
});
assert_eq!(runtime.task_count(), 1);
tokio::time::sleep(Duration::from_millis(20)).await;
assert!(ran.load(Ordering::SeqCst));
runtime.shutdown().await;
assert_eq!(runtime.task_count(), 0);
}
#[tokio::test]
async fn multiple_tasks_all_stopped() {
let mut runtime = Runtime::new();
let counter = Arc::new(AtomicU32::new(0));
for i in 0..5 {
let cancel = runtime.cancel_token();
let counter_clone = counter.clone();
runtime.spawn(format!("task-{i}"), async move {
counter_clone.fetch_add(1, Ordering::SeqCst);
cancel.cancelled().await;
});
}
assert_eq!(runtime.task_count(), 5);
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(counter.load(Ordering::SeqCst), 5);
runtime.shutdown().await;
assert_eq!(runtime.task_count(), 0);
}
#[tokio::test]
async fn shutdown_completes_quickly_for_fast_tasks() {
let mut runtime = Runtime::new();
let cancel = runtime.cancel_token();
runtime.spawn("quick-task", async move {
cancel.cancelled().await;
tokio::time::sleep(Duration::from_millis(10)).await;
});
let start = tokio::time::Instant::now();
runtime.shutdown().await;
let elapsed = start.elapsed();
assert!(elapsed < Duration::from_secs(1));
}
#[tokio::test]
async fn shutdown_handles_already_completed_tasks() {
let mut runtime = Runtime::new();
runtime.spawn("instant-task", async {
});
tokio::time::sleep(Duration::from_millis(20)).await;
runtime.shutdown().await;
}
#[tokio::test]
async fn cancel_token_is_shared() {
let runtime = Runtime::new();
let t1 = runtime.cancel_token();
let t2 = runtime.cancel_token();
assert!(!t1.is_cancelled());
assert!(!t2.is_cancelled());
t1.cancel();
assert!(t1.is_cancelled());
assert!(t2.is_cancelled());
}
#[tokio::test]
async fn default_impl() {
let runtime = Runtime::default();
assert_eq!(runtime.task_count(), 0);
assert!(!runtime.cancel_token().is_cancelled());
}
}