1use super::loop_helpers::{
10 ConsecutiveErrorTracker, LoopError, LoopTweet, PostSender, ReplyGenerator, SafetyChecker,
11};
12use super::schedule::{schedule_gate, ActiveSchedule};
13use super::scheduler::LoopScheduler;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio_util::sync::CancellationToken;
17
18#[async_trait::async_trait]
24pub trait TargetTweetFetcher: Send + Sync {
25 async fn fetch_user_tweets(&self, user_id: &str) -> Result<Vec<LoopTweet>, LoopError>;
27}
28
29#[async_trait::async_trait]
31pub trait TargetUserManager: Send + Sync {
32 async fn lookup_user(&self, username: &str) -> Result<(String, String), LoopError>;
34}
35
36#[allow(clippy::too_many_arguments)]
38#[async_trait::async_trait]
39pub trait TargetStorage: Send + Sync {
40 async fn upsert_target_account(
42 &self,
43 account_id: &str,
44 username: &str,
45 ) -> Result<(), LoopError>;
46
47 async fn target_tweet_exists(&self, tweet_id: &str) -> Result<bool, LoopError>;
49
50 async fn store_target_tweet(
52 &self,
53 tweet_id: &str,
54 account_id: &str,
55 content: &str,
56 created_at: &str,
57 reply_count: i64,
58 like_count: i64,
59 relevance_score: f64,
60 ) -> Result<(), LoopError>;
61
62 async fn mark_target_tweet_replied(&self, tweet_id: &str) -> Result<(), LoopError>;
64
65 async fn record_target_reply(&self, account_id: &str) -> Result<(), LoopError>;
67
68 async fn count_target_replies_today(&self) -> Result<i64, LoopError>;
70
71 async fn log_action(
73 &self,
74 action_type: &str,
75 status: &str,
76 message: &str,
77 ) -> Result<(), LoopError>;
78}
79
80#[derive(Debug, Clone)]
86pub struct TargetLoopConfig {
87 pub accounts: Vec<String>,
89 pub max_target_replies_per_day: u32,
91 pub dry_run: bool,
93}
94
95#[derive(Debug)]
101pub enum TargetResult {
102 Replied {
104 tweet_id: String,
105 account: String,
106 reply_text: String,
107 },
108 Skipped { tweet_id: String, reason: String },
110 Failed { tweet_id: String, error: String },
112}
113
114pub struct TargetLoop {
120 fetcher: Arc<dyn TargetTweetFetcher>,
121 user_mgr: Arc<dyn TargetUserManager>,
122 generator: Arc<dyn ReplyGenerator>,
123 safety: Arc<dyn SafetyChecker>,
124 storage: Arc<dyn TargetStorage>,
125 poster: Arc<dyn PostSender>,
126 config: TargetLoopConfig,
127}
128
129impl TargetLoop {
130 #[allow(clippy::too_many_arguments)]
132 pub fn new(
133 fetcher: Arc<dyn TargetTweetFetcher>,
134 user_mgr: Arc<dyn TargetUserManager>,
135 generator: Arc<dyn ReplyGenerator>,
136 safety: Arc<dyn SafetyChecker>,
137 storage: Arc<dyn TargetStorage>,
138 poster: Arc<dyn PostSender>,
139 config: TargetLoopConfig,
140 ) -> Self {
141 Self {
142 fetcher,
143 user_mgr,
144 generator,
145 safety,
146 storage,
147 poster,
148 config,
149 }
150 }
151
152 pub async fn run(
154 &self,
155 cancel: CancellationToken,
156 scheduler: LoopScheduler,
157 schedule: Option<Arc<ActiveSchedule>>,
158 ) {
159 tracing::info!(
160 dry_run = self.config.dry_run,
161 accounts = self.config.accounts.len(),
162 max_replies = self.config.max_target_replies_per_day,
163 "Target monitoring loop started"
164 );
165
166 if self.config.accounts.is_empty() {
167 tracing::info!("No target accounts configured, target loop has nothing to do");
168 cancel.cancelled().await;
169 return;
170 }
171
172 let mut error_tracker = ConsecutiveErrorTracker::new(10, Duration::from_secs(300));
173
174 loop {
175 if cancel.is_cancelled() {
176 break;
177 }
178
179 if !schedule_gate(&schedule, &cancel).await {
180 break;
181 }
182
183 match self.run_iteration().await {
184 Ok(results) => {
185 error_tracker.record_success();
186 let replied = results
187 .iter()
188 .filter(|r| matches!(r, TargetResult::Replied { .. }))
189 .count();
190 let skipped = results
191 .iter()
192 .filter(|r| matches!(r, TargetResult::Skipped { .. }))
193 .count();
194 if !results.is_empty() {
195 tracing::info!(
196 total = results.len(),
197 replied = replied,
198 skipped = skipped,
199 "Target iteration complete"
200 );
201 }
202 }
203 Err(e) => {
204 let should_pause = error_tracker.record_error();
205 tracing::warn!(
206 error = %e,
207 consecutive_errors = error_tracker.count(),
208 "Target iteration failed"
209 );
210
211 if should_pause {
212 tracing::warn!(
213 pause_secs = error_tracker.pause_duration().as_secs(),
214 "Pausing target loop due to consecutive errors"
215 );
216 tokio::select! {
217 _ = cancel.cancelled() => break,
218 _ = tokio::time::sleep(error_tracker.pause_duration()) => {},
219 }
220 error_tracker.reset();
221 continue;
222 }
223 }
224 }
225
226 tokio::select! {
227 _ = cancel.cancelled() => break,
228 _ = scheduler.tick() => {},
229 }
230 }
231
232 tracing::info!("Target monitoring loop stopped");
233 }
234
235 pub async fn run_iteration(&self) -> Result<Vec<TargetResult>, LoopError> {
237 let mut all_results = Vec::new();
238
239 let replies_today = self.storage.count_target_replies_today().await?;
241 if replies_today >= self.config.max_target_replies_per_day as i64 {
242 tracing::debug!(
243 replies_today = replies_today,
244 limit = self.config.max_target_replies_per_day,
245 "Target reply daily limit reached"
246 );
247 return Ok(all_results);
248 }
249
250 let mut remaining_replies =
251 (self.config.max_target_replies_per_day as i64 - replies_today) as usize;
252
253 for username in &self.config.accounts {
254 if remaining_replies == 0 {
255 break;
256 }
257
258 match self.process_account(username, remaining_replies).await {
259 Ok(results) => {
260 let replied_count = results
261 .iter()
262 .filter(|r| matches!(r, TargetResult::Replied { .. }))
263 .count();
264 remaining_replies = remaining_replies.saturating_sub(replied_count);
265 all_results.extend(results);
266 }
267 Err(e) => {
268 if matches!(e, LoopError::AuthExpired) {
271 tracing::error!(
272 username = %username,
273 "X API authentication expired, re-authenticate with `tuitbot init`"
274 );
275 return Err(e);
276 }
277
278 tracing::warn!(
279 username = %username,
280 error = %e,
281 "Failed to process target account"
282 );
283 }
284 }
285 }
286
287 Ok(all_results)
288 }
289
290 async fn process_account(
292 &self,
293 username: &str,
294 max_replies: usize,
295 ) -> Result<Vec<TargetResult>, LoopError> {
296 let (user_id, resolved_username) = self.user_mgr.lookup_user(username).await?;
298
299 self.storage
301 .upsert_target_account(&user_id, &resolved_username)
302 .await?;
303
304 let tweets = self.fetcher.fetch_user_tweets(&user_id).await?;
306 tracing::info!(
307 username = %resolved_username,
308 count = tweets.len(),
309 "Monitoring @{}, found {} new tweets",
310 resolved_username,
311 tweets.len(),
312 );
313
314 let mut results = Vec::new();
315
316 for tweet in tweets.iter().take(max_replies) {
317 let result = self
318 .process_target_tweet(tweet, &user_id, &resolved_username)
319 .await;
320 if matches!(result, TargetResult::Replied { .. }) {
321 results.push(result);
322 break;
324 }
325 results.push(result);
326 }
327
328 Ok(results)
329 }
330
331 async fn process_target_tweet(
333 &self,
334 tweet: &LoopTweet,
335 account_id: &str,
336 username: &str,
337 ) -> TargetResult {
338 match self.storage.target_tweet_exists(&tweet.id).await {
340 Ok(true) => {
341 return TargetResult::Skipped {
342 tweet_id: tweet.id.clone(),
343 reason: "already discovered".to_string(),
344 };
345 }
346 Ok(false) => {}
347 Err(e) => {
348 tracing::warn!(tweet_id = %tweet.id, error = %e, "Failed to check target tweet");
349 }
350 }
351
352 let _ = self
354 .storage
355 .store_target_tweet(
356 &tweet.id,
357 account_id,
358 &tweet.text,
359 &tweet.created_at,
360 tweet.replies as i64,
361 tweet.likes as i64,
362 0.0,
363 )
364 .await;
365
366 if self.safety.has_replied_to(&tweet.id).await {
368 return TargetResult::Skipped {
369 tweet_id: tweet.id.clone(),
370 reason: "already replied".to_string(),
371 };
372 }
373
374 if !self.safety.can_reply().await {
375 return TargetResult::Skipped {
376 tweet_id: tweet.id.clone(),
377 reason: "rate limited".to_string(),
378 };
379 }
380
381 let reply_output = match self
383 .generator
384 .generate_reply_with_rag(&tweet.text, username, false)
385 .await
386 {
387 Ok(output) => output,
388 Err(e) => {
389 return TargetResult::Failed {
390 tweet_id: tweet.id.clone(),
391 error: e.to_string(),
392 };
393 }
394 };
395 let reply_text = reply_output.text;
396
397 tracing::info!(
398 username = %username,
399 "Replied to target @{}",
400 username,
401 );
402
403 if self.config.dry_run {
404 tracing::info!(
405 "DRY RUN: Target @{} tweet {} -- Would reply: \"{}\"",
406 username,
407 tweet.id,
408 reply_text
409 );
410
411 let _ = self
412 .storage
413 .log_action(
414 "target_reply",
415 "dry_run",
416 &format!("Reply to @{username}: {}", truncate(&reply_text, 50)),
417 )
418 .await;
419 } else {
420 if let Err(e) = self.poster.send_reply(&tweet.id, &reply_text).await {
421 return TargetResult::Failed {
422 tweet_id: tweet.id.clone(),
423 error: e.to_string(),
424 };
425 }
426
427 if let Err(e) = self.safety.record_reply(&tweet.id, &reply_text).await {
428 tracing::warn!(tweet_id = %tweet.id, error = %e, "Failed to record reply");
429 }
430
431 let _ = self.storage.mark_target_tweet_replied(&tweet.id).await;
433 let _ = self.storage.record_target_reply(account_id).await;
434
435 let _ = self
436 .storage
437 .log_action(
438 "target_reply",
439 "success",
440 &format!("Replied to @{username}: {}", truncate(&reply_text, 50)),
441 )
442 .await;
443 }
444
445 TargetResult::Replied {
446 tweet_id: tweet.id.clone(),
447 account: username.to_string(),
448 reply_text,
449 }
450 }
451}
452
453pub(crate) fn truncate(s: &str, max_len: usize) -> String {
455 if s.len() <= max_len {
456 s.to_string()
457 } else {
458 format!("{}...", &s[..max_len])
459 }
460}
461
462#[cfg(test)]
463mod tests;