1use super::loop_helpers::{
9 ConsecutiveErrorTracker, LoopError, LoopStorage, LoopTweet, PostSender, ReplyGenerator,
10 SafetyChecker, TweetScorer, TweetSearcher,
11};
12use super::schedule::{schedule_gate, ActiveSchedule};
13use super::scheduler::LoopScheduler;
14use std::sync::Arc;
15use std::time::Duration;
16use tokio_util::sync::CancellationToken;
17
18pub struct DiscoveryLoop {
20 searcher: Arc<dyn TweetSearcher>,
21 scorer: Arc<dyn TweetScorer>,
22 generator: Arc<dyn ReplyGenerator>,
23 safety: Arc<dyn SafetyChecker>,
24 storage: Arc<dyn LoopStorage>,
25 poster: Arc<dyn PostSender>,
26 keywords: Vec<String>,
27 threshold: f32,
28 dry_run: bool,
29}
30
31#[derive(Debug)]
33pub enum DiscoveryResult {
34 Replied {
36 tweet_id: String,
37 author: String,
38 score: f32,
39 reply_text: String,
40 },
41 BelowThreshold { tweet_id: String, score: f32 },
43 Skipped { tweet_id: String, reason: String },
45 Failed { tweet_id: String, error: String },
47}
48
49#[derive(Debug, Default)]
51pub struct DiscoverySummary {
52 pub tweets_found: usize,
54 pub qualifying: usize,
56 pub replied: usize,
58 pub skipped: usize,
60 pub failed: usize,
62}
63
64impl DiscoveryLoop {
65 #[allow(clippy::too_many_arguments)]
67 pub fn new(
68 searcher: Arc<dyn TweetSearcher>,
69 scorer: Arc<dyn TweetScorer>,
70 generator: Arc<dyn ReplyGenerator>,
71 safety: Arc<dyn SafetyChecker>,
72 storage: Arc<dyn LoopStorage>,
73 poster: Arc<dyn PostSender>,
74 keywords: Vec<String>,
75 threshold: f32,
76 dry_run: bool,
77 ) -> Self {
78 Self {
79 searcher,
80 scorer,
81 generator,
82 safety,
83 storage,
84 poster,
85 keywords,
86 threshold,
87 dry_run,
88 }
89 }
90
91 pub async fn run(
95 &self,
96 cancel: CancellationToken,
97 scheduler: LoopScheduler,
98 schedule: Option<Arc<ActiveSchedule>>,
99 ) {
100 tracing::info!(
101 dry_run = self.dry_run,
102 keywords = self.keywords.len(),
103 threshold = self.threshold,
104 "Discovery loop started"
105 );
106
107 if self.keywords.is_empty() {
108 tracing::warn!("No keywords configured, discovery loop has nothing to search");
109 cancel.cancelled().await;
110 return;
111 }
112
113 let mut error_tracker = ConsecutiveErrorTracker::new(10, Duration::from_secs(300));
114 let mut keyword_index = 0usize;
115
116 loop {
117 if cancel.is_cancelled() {
118 break;
119 }
120
121 if !schedule_gate(&schedule, &cancel).await {
122 break;
123 }
124
125 let keyword = &self.keywords[keyword_index % self.keywords.len()];
127 keyword_index += 1;
128
129 match self.search_and_process(keyword, None).await {
130 Ok((_results, summary)) => {
131 error_tracker.record_success();
132 if summary.tweets_found > 0 {
133 tracing::info!(
134 keyword = %keyword,
135 found = summary.tweets_found,
136 qualifying = summary.qualifying,
137 replied = summary.replied,
138 "Discovery iteration complete"
139 );
140 }
141 }
142 Err(e) => {
143 let should_pause = error_tracker.record_error();
144 tracing::warn!(
145 keyword = %keyword,
146 error = %e,
147 consecutive_errors = error_tracker.count(),
148 "Discovery iteration failed"
149 );
150
151 if should_pause {
152 tracing::warn!(
153 pause_secs = error_tracker.pause_duration().as_secs(),
154 "Pausing discovery loop due to consecutive errors"
155 );
156 tokio::select! {
157 _ = cancel.cancelled() => break,
158 _ = tokio::time::sleep(error_tracker.pause_duration()) => {},
159 }
160 error_tracker.reset();
161 continue;
162 }
163
164 if let LoopError::RateLimited { retry_after } = &e {
165 let backoff = super::loop_helpers::rate_limit_backoff(*retry_after, 0);
166 tokio::select! {
167 _ = cancel.cancelled() => break,
168 _ = tokio::time::sleep(backoff) => {},
169 }
170 continue;
171 }
172 }
173 }
174
175 tokio::select! {
176 _ = cancel.cancelled() => break,
177 _ = scheduler.tick() => {},
178 }
179 }
180
181 tracing::info!("Discovery loop stopped");
182 }
183
184 pub async fn run_once(
189 &self,
190 limit: Option<usize>,
191 ) -> Result<(Vec<DiscoveryResult>, DiscoverySummary), LoopError> {
192 let mut all_results = Vec::new();
193 let mut summary = DiscoverySummary::default();
194 let mut total_processed = 0usize;
195 let mut last_error: Option<LoopError> = None;
196 let mut any_success = false;
197
198 for keyword in &self.keywords {
199 if let Some(max) = limit {
200 if total_processed >= max {
201 break;
202 }
203 }
204
205 let remaining = limit.map(|max| max.saturating_sub(total_processed));
206 match self.search_and_process(keyword, remaining).await {
207 Ok((results, iter_summary)) => {
208 any_success = true;
209 summary.tweets_found += iter_summary.tweets_found;
210 summary.qualifying += iter_summary.qualifying;
211 summary.replied += iter_summary.replied;
212 summary.skipped += iter_summary.skipped;
213 summary.failed += iter_summary.failed;
214 total_processed += iter_summary.tweets_found;
215 all_results.extend(results);
216 }
217 Err(e) => {
218 tracing::warn!(keyword = %keyword, error = %e, "Search failed for keyword");
219 last_error = Some(e);
220 }
221 }
222 }
223
224 if !any_success {
227 if let Some(err) = last_error {
228 return Err(err);
229 }
230 }
231
232 Ok((all_results, summary))
233 }
234
235 pub(crate) async fn search_and_process(
237 &self,
238 keyword: &str,
239 limit: Option<usize>,
240 ) -> Result<(Vec<DiscoveryResult>, DiscoverySummary), LoopError> {
241 tracing::info!(keyword = %keyword, "Searching keyword");
242 let tweets = self.searcher.search_tweets(keyword).await?;
243
244 let mut summary = DiscoverySummary {
245 tweets_found: tweets.len(),
246 ..Default::default()
247 };
248
249 let to_process = match limit {
250 Some(n) => &tweets[..tweets.len().min(n)],
251 None => &tweets,
252 };
253
254 let mut results = Vec::with_capacity(to_process.len());
255
256 for tweet in to_process {
257 let result = self.process_tweet(tweet, keyword).await;
258
259 match &result {
260 DiscoveryResult::Replied { .. } => {
261 summary.qualifying += 1;
262 summary.replied += 1;
263 }
264 DiscoveryResult::BelowThreshold { .. } => {
265 summary.skipped += 1;
266 }
267 DiscoveryResult::Skipped { .. } => {
268 summary.skipped += 1;
269 }
270 DiscoveryResult::Failed { .. } => {
271 summary.failed += 1;
272 }
273 }
274
275 results.push(result);
276 }
277
278 Ok((results, summary))
279 }
280
281 pub(crate) async fn process_tweet(&self, tweet: &LoopTweet, keyword: &str) -> DiscoveryResult {
283 match self.storage.tweet_exists(&tweet.id).await {
285 Ok(true) => {
286 tracing::debug!(tweet_id = %tweet.id, "Tweet already discovered, skipping");
287 return DiscoveryResult::Skipped {
288 tweet_id: tweet.id.clone(),
289 reason: "already discovered".to_string(),
290 };
291 }
292 Ok(false) => {}
293 Err(e) => {
294 tracing::warn!(tweet_id = %tweet.id, error = %e, "Failed to check tweet existence");
295 }
297 }
298
299 let score_result = self.scorer.score(tweet);
301
302 if let Err(e) = self
304 .storage
305 .store_discovered_tweet(tweet, score_result.total, keyword)
306 .await
307 {
308 tracing::warn!(tweet_id = %tweet.id, error = %e, "Failed to store discovered tweet");
309 }
310
311 if !score_result.meets_threshold {
313 tracing::debug!(
314 tweet_id = %tweet.id,
315 score = score_result.total,
316 threshold = self.threshold,
317 "Tweet scored below threshold, skipping"
318 );
319 return DiscoveryResult::BelowThreshold {
320 tweet_id: tweet.id.clone(),
321 score: score_result.total,
322 };
323 }
324
325 if self.safety.has_replied_to(&tweet.id).await {
327 return DiscoveryResult::Skipped {
328 tweet_id: tweet.id.clone(),
329 reason: "already replied".to_string(),
330 };
331 }
332
333 if !self.safety.can_reply().await {
334 return DiscoveryResult::Skipped {
335 tweet_id: tweet.id.clone(),
336 reason: "rate limited".to_string(),
337 };
338 }
339
340 let reply_output = match self
342 .generator
343 .generate_reply_with_rag(&tweet.text, &tweet.author_username, true)
344 .await
345 {
346 Ok(output) => output,
347 Err(e) => {
348 tracing::error!(
349 tweet_id = %tweet.id,
350 error = %e,
351 "Failed to generate reply"
352 );
353 return DiscoveryResult::Failed {
354 tweet_id: tweet.id.clone(),
355 error: e.to_string(),
356 };
357 }
358 };
359 let reply_text = reply_output.text;
360
361 tracing::info!(
362 author = %tweet.author_username,
363 score = format!("{:.0}", score_result.total),
364 "Posted reply to @{}",
365 tweet.author_username,
366 );
367
368 if self.dry_run {
369 tracing::info!(
370 "DRY RUN: Tweet {} by @{} scored {:.0}/100 -- Would reply: \"{}\"",
371 tweet.id,
372 tweet.author_username,
373 score_result.total,
374 reply_text
375 );
376
377 let _ = self
378 .storage
379 .log_action(
380 "discovery_reply",
381 "dry_run",
382 &format!(
383 "Score {:.0}, reply to @{}: {}",
384 score_result.total,
385 tweet.author_username,
386 truncate(&reply_text, 50)
387 ),
388 )
389 .await;
390 } else {
391 if let Err(e) = self.poster.send_reply(&tweet.id, &reply_text).await {
392 tracing::error!(tweet_id = %tweet.id, error = %e, "Failed to send reply");
393 return DiscoveryResult::Failed {
394 tweet_id: tweet.id.clone(),
395 error: e.to_string(),
396 };
397 }
398
399 if let Err(e) = self.safety.record_reply(&tweet.id, &reply_text).await {
400 tracing::warn!(tweet_id = %tweet.id, error = %e, "Failed to record reply");
401 }
402
403 let _ = self
404 .storage
405 .log_action(
406 "discovery_reply",
407 "success",
408 &format!(
409 "Score {:.0}, replied to @{}: {}",
410 score_result.total,
411 tweet.author_username,
412 truncate(&reply_text, 50)
413 ),
414 )
415 .await;
416 }
417
418 DiscoveryResult::Replied {
419 tweet_id: tweet.id.clone(),
420 author: tweet.author_username.clone(),
421 score: score_result.total,
422 reply_text,
423 }
424 }
425}
426
427pub(crate) fn truncate(s: &str, max_len: usize) -> String {
429 if s.len() <= max_len {
430 s.to_string()
431 } else {
432 format!("{}...", &s[..max_len])
433 }
434}
435
436#[cfg(test)]
437mod tests;