Skip to main content

tuitbot_core/automation/target_loop/
mod.rs

1//! Target account monitoring loop.
2//!
3//! Fetches recent tweets from configured target accounts, scores them
4//! with adjusted weights (preferring recency and low reply count), and
5//! generates relationship-based replies. This loop operates independently
6//! from keyword-based discovery to enable genuine engagement with specific
7//! people.
8
9use super::loop_helpers::{
10    ConsecutiveErrorTracker, LoopError, LoopTweet, PostSender, ReplyGenerator, SafetyChecker,
11};
12use super::schedule::{schedule_gate, ActiveSchedule};
13use super::scheduler::LoopScheduler;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio_util::sync::CancellationToken;
17
18// ============================================================================
19// Port traits specific to target loop
20// ============================================================================
21
22/// Fetches tweets from a specific user by user ID.
23#[async_trait::async_trait]
24pub trait TargetTweetFetcher: Send + Sync {
25    /// Fetch recent tweets from the given user.
26    async fn fetch_user_tweets(&self, user_id: &str) -> Result<Vec<LoopTweet>, LoopError>;
27}
28
29/// Looks up a user by username.
30#[async_trait::async_trait]
31pub trait TargetUserManager: Send + Sync {
32    /// Look up a user by username. Returns (user_id, username).
33    async fn lookup_user(&self, username: &str) -> Result<(String, String), LoopError>;
34}
35
36/// Storage operations for target account state.
37#[allow(clippy::too_many_arguments)]
38#[async_trait::async_trait]
39pub trait TargetStorage: Send + Sync {
40    /// Upsert a target account record.
41    async fn upsert_target_account(
42        &self,
43        account_id: &str,
44        username: &str,
45    ) -> Result<(), LoopError>;
46
47    /// Check if a target tweet already exists.
48    async fn target_tweet_exists(&self, tweet_id: &str) -> Result<bool, LoopError>;
49
50    /// Store a discovered target tweet.
51    async fn store_target_tweet(
52        &self,
53        tweet_id: &str,
54        account_id: &str,
55        content: &str,
56        created_at: &str,
57        reply_count: i64,
58        like_count: i64,
59        relevance_score: f64,
60    ) -> Result<(), LoopError>;
61
62    /// Mark a target tweet as replied to.
63    async fn mark_target_tweet_replied(&self, tweet_id: &str) -> Result<(), LoopError>;
64
65    /// Record a reply to a target account (increments counter).
66    async fn record_target_reply(&self, account_id: &str) -> Result<(), LoopError>;
67
68    /// Get count of target replies sent today.
69    async fn count_target_replies_today(&self) -> Result<i64, LoopError>;
70
71    /// Log an action.
72    async fn log_action(
73        &self,
74        action_type: &str,
75        status: &str,
76        message: &str,
77    ) -> Result<(), LoopError>;
78}
79
80// ============================================================================
81// Target loop config
82// ============================================================================
83
84/// Configuration for the target monitoring loop.
85#[derive(Debug, Clone)]
86pub struct TargetLoopConfig {
87    /// Target account usernames (without @).
88    pub accounts: Vec<String>,
89    /// Maximum target replies per day.
90    pub max_target_replies_per_day: u32,
91    /// Whether this is a dry run.
92    pub dry_run: bool,
93}
94
95// ============================================================================
96// Target loop result
97// ============================================================================
98
99/// Result of processing a single target tweet.
100#[derive(Debug)]
101pub enum TargetResult {
102    /// Reply was sent (or would be in dry-run).
103    Replied {
104        tweet_id: String,
105        account: String,
106        reply_text: String,
107    },
108    /// Tweet was skipped.
109    Skipped { tweet_id: String, reason: String },
110    /// Processing failed.
111    Failed { tweet_id: String, error: String },
112}
113
114// ============================================================================
115// Target loop
116// ============================================================================
117
118/// Monitors target accounts and generates relationship-based replies.
119pub struct TargetLoop {
120    fetcher: Arc<dyn TargetTweetFetcher>,
121    user_mgr: Arc<dyn TargetUserManager>,
122    generator: Arc<dyn ReplyGenerator>,
123    safety: Arc<dyn SafetyChecker>,
124    storage: Arc<dyn TargetStorage>,
125    poster: Arc<dyn PostSender>,
126    config: TargetLoopConfig,
127}
128
129impl TargetLoop {
130    /// Create a new target monitoring loop.
131    #[allow(clippy::too_many_arguments)]
132    pub fn new(
133        fetcher: Arc<dyn TargetTweetFetcher>,
134        user_mgr: Arc<dyn TargetUserManager>,
135        generator: Arc<dyn ReplyGenerator>,
136        safety: Arc<dyn SafetyChecker>,
137        storage: Arc<dyn TargetStorage>,
138        poster: Arc<dyn PostSender>,
139        config: TargetLoopConfig,
140    ) -> Self {
141        Self {
142            fetcher,
143            user_mgr,
144            generator,
145            safety,
146            storage,
147            poster,
148            config,
149        }
150    }
151
152    /// Run the continuous target monitoring loop until cancellation.
153    pub async fn run(
154        &self,
155        cancel: CancellationToken,
156        scheduler: LoopScheduler,
157        schedule: Option<Arc<ActiveSchedule>>,
158    ) {
159        tracing::info!(
160            dry_run = self.config.dry_run,
161            accounts = self.config.accounts.len(),
162            max_replies = self.config.max_target_replies_per_day,
163            "Target monitoring loop started"
164        );
165
166        if self.config.accounts.is_empty() {
167            tracing::info!("No target accounts configured, target loop has nothing to do");
168            cancel.cancelled().await;
169            return;
170        }
171
172        let mut error_tracker = ConsecutiveErrorTracker::new(10, Duration::from_secs(300));
173
174        loop {
175            if cancel.is_cancelled() {
176                break;
177            }
178
179            if !schedule_gate(&schedule, &cancel).await {
180                break;
181            }
182
183            match self.run_iteration().await {
184                Ok(results) => {
185                    error_tracker.record_success();
186                    let replied = results
187                        .iter()
188                        .filter(|r| matches!(r, TargetResult::Replied { .. }))
189                        .count();
190                    let skipped = results
191                        .iter()
192                        .filter(|r| matches!(r, TargetResult::Skipped { .. }))
193                        .count();
194                    if !results.is_empty() {
195                        tracing::info!(
196                            total = results.len(),
197                            replied = replied,
198                            skipped = skipped,
199                            "Target iteration complete"
200                        );
201                    }
202                }
203                Err(e) => {
204                    let should_pause = error_tracker.record_error();
205                    tracing::warn!(
206                        error = %e,
207                        consecutive_errors = error_tracker.count(),
208                        "Target iteration failed"
209                    );
210
211                    if should_pause {
212                        tracing::warn!(
213                            pause_secs = error_tracker.pause_duration().as_secs(),
214                            "Pausing target loop due to consecutive errors"
215                        );
216                        tokio::select! {
217                            _ = cancel.cancelled() => break,
218                            _ = tokio::time::sleep(error_tracker.pause_duration()) => {},
219                        }
220                        error_tracker.reset();
221                        continue;
222                    }
223                }
224            }
225
226            tokio::select! {
227                _ = cancel.cancelled() => break,
228                _ = scheduler.tick() => {},
229            }
230        }
231
232        tracing::info!("Target monitoring loop stopped");
233    }
234
235    /// Run a single iteration across all target accounts.
236    pub async fn run_iteration(&self) -> Result<Vec<TargetResult>, LoopError> {
237        let mut all_results = Vec::new();
238
239        // Check daily limit
240        let replies_today = self.storage.count_target_replies_today().await?;
241        if replies_today >= self.config.max_target_replies_per_day as i64 {
242            tracing::debug!(
243                replies_today = replies_today,
244                limit = self.config.max_target_replies_per_day,
245                "Target reply daily limit reached"
246            );
247            return Ok(all_results);
248        }
249
250        let mut remaining_replies =
251            (self.config.max_target_replies_per_day as i64 - replies_today) as usize;
252
253        for username in &self.config.accounts {
254            if remaining_replies == 0 {
255                break;
256            }
257
258            match self.process_account(username, remaining_replies).await {
259                Ok(results) => {
260                    let replied_count = results
261                        .iter()
262                        .filter(|r| matches!(r, TargetResult::Replied { .. }))
263                        .count();
264                    remaining_replies = remaining_replies.saturating_sub(replied_count);
265                    all_results.extend(results);
266                }
267                Err(e) => {
268                    // AuthExpired is global — stop immediately instead of
269                    // failing N times with the same 401.
270                    if matches!(e, LoopError::AuthExpired) {
271                        tracing::error!(
272                            username = %username,
273                            "X API authentication expired, re-authenticate with `tuitbot init`"
274                        );
275                        return Err(e);
276                    }
277
278                    tracing::warn!(
279                        username = %username,
280                        error = %e,
281                        "Failed to process target account"
282                    );
283                }
284            }
285        }
286
287        Ok(all_results)
288    }
289
290    /// Process a single target account: resolve, fetch tweets, reply.
291    async fn process_account(
292        &self,
293        username: &str,
294        max_replies: usize,
295    ) -> Result<Vec<TargetResult>, LoopError> {
296        // Look up user
297        let (user_id, resolved_username) = self.user_mgr.lookup_user(username).await?;
298
299        // Upsert target account record
300        self.storage
301            .upsert_target_account(&user_id, &resolved_username)
302            .await?;
303
304        // Fetch recent tweets
305        let tweets = self.fetcher.fetch_user_tweets(&user_id).await?;
306        tracing::info!(
307            username = %resolved_username,
308            count = tweets.len(),
309            "Monitoring @{}, found {} new tweets",
310            resolved_username,
311            tweets.len(),
312        );
313
314        let mut results = Vec::new();
315
316        for tweet in tweets.iter().take(max_replies) {
317            let result = self
318                .process_target_tweet(tweet, &user_id, &resolved_username)
319                .await;
320            if matches!(result, TargetResult::Replied { .. }) {
321                results.push(result);
322                // Only reply to one tweet per account per iteration
323                break;
324            }
325            results.push(result);
326        }
327
328        Ok(results)
329    }
330
331    /// Process a single target tweet: dedup, safety check, generate reply, post.
332    async fn process_target_tweet(
333        &self,
334        tweet: &LoopTweet,
335        account_id: &str,
336        username: &str,
337    ) -> TargetResult {
338        // Check if already seen
339        match self.storage.target_tweet_exists(&tweet.id).await {
340            Ok(true) => {
341                return TargetResult::Skipped {
342                    tweet_id: tweet.id.clone(),
343                    reason: "already discovered".to_string(),
344                };
345            }
346            Ok(false) => {}
347            Err(e) => {
348                tracing::warn!(tweet_id = %tweet.id, error = %e, "Failed to check target tweet");
349            }
350        }
351
352        // Store the discovered tweet
353        let _ = self
354            .storage
355            .store_target_tweet(
356                &tweet.id,
357                account_id,
358                &tweet.text,
359                &tweet.created_at,
360                tweet.replies as i64,
361                tweet.likes as i64,
362                0.0,
363            )
364            .await;
365
366        // Safety checks
367        if self.safety.has_replied_to(&tweet.id).await {
368            return TargetResult::Skipped {
369                tweet_id: tweet.id.clone(),
370                reason: "already replied".to_string(),
371            };
372        }
373
374        if !self.safety.can_reply().await {
375            return TargetResult::Skipped {
376                tweet_id: tweet.id.clone(),
377                reason: "rate limited".to_string(),
378            };
379        }
380
381        // Generate reply with vault context (no product mention — genuine engagement)
382        let reply_output = match self
383            .generator
384            .generate_reply_with_rag(&tweet.text, username, false)
385            .await
386        {
387            Ok(output) => output,
388            Err(e) => {
389                return TargetResult::Failed {
390                    tweet_id: tweet.id.clone(),
391                    error: e.to_string(),
392                };
393            }
394        };
395        let reply_text = reply_output.text;
396
397        tracing::info!(
398            username = %username,
399            "Replied to target @{}",
400            username,
401        );
402
403        if self.config.dry_run {
404            tracing::info!(
405                "DRY RUN: Target @{} tweet {} -- Would reply: \"{}\"",
406                username,
407                tweet.id,
408                reply_text
409            );
410
411            let _ = self
412                .storage
413                .log_action(
414                    "target_reply",
415                    "dry_run",
416                    &format!("Reply to @{username}: {}", truncate(&reply_text, 50)),
417                )
418                .await;
419        } else {
420            if let Err(e) = self.poster.send_reply(&tweet.id, &reply_text).await {
421                return TargetResult::Failed {
422                    tweet_id: tweet.id.clone(),
423                    error: e.to_string(),
424                };
425            }
426
427            if let Err(e) = self.safety.record_reply(&tweet.id, &reply_text).await {
428                tracing::warn!(tweet_id = %tweet.id, error = %e, "Failed to record reply");
429            }
430
431            // Mark tweet as replied and update account stats
432            let _ = self.storage.mark_target_tweet_replied(&tweet.id).await;
433            let _ = self.storage.record_target_reply(account_id).await;
434
435            let _ = self
436                .storage
437                .log_action(
438                    "target_reply",
439                    "success",
440                    &format!("Replied to @{username}: {}", truncate(&reply_text, 50)),
441                )
442                .await;
443        }
444
445        TargetResult::Replied {
446            tweet_id: tweet.id.clone(),
447            account: username.to_string(),
448            reply_text,
449        }
450    }
451}
452
453/// Truncate a string for display.
454pub(crate) fn truncate(s: &str, max_len: usize) -> String {
455    if s.len() <= max_len {
456        s.to_string()
457    } else {
458        format!("{}...", &s[..max_len])
459    }
460}
461
462#[cfg(test)]
463mod tests;