tuitbot-core 0.1.47

Core library for Tuitbot autonomous X growth assistant
Documentation
//! Scheduled-content posting — single tweets and multi-tweet thread chains.
//!
//! Implements the `try_post_scheduled` and `post_scheduled_thread` methods
//! on [`ContentLoop`].
//!
//! ## Retry & Dead-Letter Strategy
//!
//! Both single tweets and thread chains implement exponential backoff retry:
//! - Transient failures (429, 5xx, timeout): retry up to 3 times with 30s base backoff
//! - Permanent failures (401, 403, bad request): fail immediately, mark as `failed-permanent`
//! - Exhausted retries: mark as `failed-permanent` for manual review

use super::super::loop_helpers::{is_transient_error, thread_retry_backoff};
use super::{ContentLoop, ContentResult};

impl ContentLoop {
    /// Check for scheduled content due for posting and post it if found.
    ///
    /// Returns `Some(ContentResult)` if a scheduled item was handled,
    /// `None` if no scheduled items are due.
    pub(super) async fn try_post_scheduled(&self) -> Option<ContentResult> {
        match self.storage.next_scheduled_item().await {
            Ok(Some((id, content_type, content))) => {
                tracing::info!(
                    id = id,
                    content_type = %content_type,
                    "Posting scheduled content"
                );

                let preview = &content[..content.len().min(80)];

                if self.dry_run {
                    tracing::info!(
                        "DRY RUN: Would post scheduled {} (id={}): \"{}\"",
                        content_type,
                        id,
                        preview
                    );
                    let _ = self
                        .storage
                        .log_action(
                            &content_type,
                            "dry_run",
                            &format!("Scheduled id={id}: {preview}"),
                        )
                        .await;
                } else if content_type == "thread" {
                    // Post thread as a reply chain.
                    match self.post_scheduled_thread(id, &content).await {
                        Ok(()) => {}
                        Err(e) => {
                            return Some(ContentResult::Failed {
                                error: format!("Scheduled thread failed: {e}"),
                            });
                        }
                    }
                } else if let Err(e) = self.storage.post_tweet("scheduled", &content).await {
                    tracing::error!(error = %e, "Failed to post scheduled content");
                    return Some(ContentResult::Failed {
                        error: format!("Scheduled post failed: {e}"),
                    });
                } else {
                    let _ = self.storage.mark_scheduled_posted(id, None).await;
                    let _ = self
                        .storage
                        .log_action(
                            &content_type,
                            "success",
                            &format!("Scheduled id={id}: {preview}"),
                        )
                        .await;
                }

                Some(ContentResult::Posted {
                    topic: format!("scheduled:{id}"),
                    content,
                })
            }
            Ok(None) => None,
            Err(e) => {
                tracing::warn!(error = %e, "Failed to check scheduled content");
                None
            }
        }
    }

    /// Post a scheduled thread as a reply chain using the `ThreadPoster`.
    ///
    /// Implements retry logic for transient failures:
    /// - Retries up to 3 times with exponential backoff (30s base, 2x per attempt)
    /// - Permanent errors (401, 403, validation) fail immediately
    /// - Exhausted retries marked as `failed-permanent` for manual review
    async fn post_scheduled_thread(&self, id: i64, content: &str) -> Result<(), String> {
        let poster = self.thread_poster.as_ref().ok_or_else(|| {
            "No thread poster configured — cannot post scheduled threads".to_string()
        })?;

        // Parse blocks from stored content (versioned JSON or legacy string array).
        let tweets: Vec<String> =
            if let Some(blocks) = crate::content::deserialize_blocks_from_content(content) {
                let mut sorted = blocks;
                sorted.sort_by_key(|b| b.order);
                sorted.into_iter().map(|b| b.text).collect()
            } else if let Ok(arr) = serde_json::from_str::<Vec<String>>(content) {
                arr
            } else {
                return Err(format!("Cannot parse thread content for scheduled id={id}"));
            };

        if tweets.is_empty() {
            return Err(format!("Scheduled thread id={id} has no tweets"));
        }

        // Post first tweet, then reply chain.
        let mut prev_id: Option<String> = None;
        for (i, text) in tweets.iter().enumerate() {
            let mut post_result: Result<String, String> = Err("not attempted".to_string());

            // Retry loop: up to 3 attempts for transient failures
            for attempt in 0..3 {
                let result = if let Some(ref reply_to) = prev_id {
                    poster.reply_to_tweet(reply_to, text).await
                } else {
                    poster.post_tweet(text).await
                };

                match result {
                    Ok(tweet_id) => {
                        post_result = Ok(tweet_id);
                        break; // Success, exit retry loop
                    }
                    Err(e) => {
                        let error_msg = e.to_string();

                        // Check if transient (retryable) or permanent (dead-letter)
                        if !is_transient_error(&error_msg) {
                            // Permanent error — fail immediately
                            tracing::error!(
                                scheduled_id = id,
                                tweet_index = i,
                                error = %error_msg,
                                "Permanent failure in scheduled thread tweet {}/{}",
                                i + 1,
                                tweets.len()
                            );
                            let _ = self
                                .storage
                                .mark_failed_permanent(&format!("scheduled-{id}"), &error_msg)
                                .await;
                            return Err(format!(
                                "Scheduled thread id={id} failed permanently at tweet {}/{}: {}",
                                i + 1,
                                tweets.len(),
                                error_msg
                            ));
                        }

                        // Transient error — retry if attempts remain
                        if attempt < 2 {
                            let retry_count = attempt as u32 + 1;
                            let backoff = thread_retry_backoff(retry_count);
                            tracing::warn!(
                                scheduled_id = id,
                                tweet_index = i,
                                attempt = attempt + 1,
                                backoff_secs = backoff.as_secs(),
                                error = %error_msg,
                                "Transient error in scheduled thread tweet {}/{}, retrying after {:?}",
                                i + 1,
                                tweets.len(),
                                backoff
                            );
                            // Increment retry count in storage
                            let _ = self
                                .storage
                                .increment_retry(&format!("scheduled-{id}"), &error_msg)
                                .await;
                            tokio::time::sleep(backoff).await;
                        } else {
                            // Exhausted retries for transient error
                            tracing::error!(
                                scheduled_id = id,
                                tweet_index = i,
                                error = %error_msg,
                                "Exhausted retries for scheduled thread tweet {}/{} (marked for dead-letter)",
                                i + 1,
                                tweets.len()
                            );
                            let _ = self
                                .storage
                                .increment_retry(&format!("scheduled-{id}"), &error_msg)
                                .await;
                        }
                    }
                }
            }

            match post_result {
                Ok(tweet_id) => prev_id = Some(tweet_id),
                Err(e) => {
                    // All retries exhausted
                    tracing::error!(
                        scheduled_id = id,
                        tweet_index = i,
                        error = %e,
                        "Scheduled thread tweet {}/{} failed after all retries",
                        i + 1,
                        tweets.len()
                    );
                    let _ = self
                        .storage
                        .mark_failed_permanent(&format!("scheduled-{id}"), &e)
                        .await;
                    return Err(format!(
                        "Scheduled thread id={id} failed at tweet {}/{} after retries: {}",
                        i + 1,
                        tweets.len(),
                        e
                    ));
                }
            }
        }

        let _ = self
            .storage
            .mark_scheduled_posted(id, prev_id.as_deref())
            .await;
        let _ = self
            .storage
            .log_action(
                "thread",
                "success",
                &format!("Scheduled thread id={id}: {} tweets posted", tweets.len()),
            )
            .await;

        Ok(())
    }
}

#[cfg(test)]
mod tests_integration {
    use super::*;
    use crate::automation::content_loop::test_mocks::{MockGenerator, MockSafety, MockStorage};
    use crate::automation::loop_helpers::ContentStorage;
    use std::sync::Arc;

    // ========================================================================
    // Integration Test 1: No scheduled items (returns None)
    // ========================================================================
    #[tokio::test]
    async fn publisher_no_scheduled_items_returns_none() {
        let storage = Arc::new(MockStorage::new(None));
        let safety = Arc::new(MockSafety {
            can_tweet: true,
            can_thread: true,
        });

        let loop_ = ContentLoop::new(
            Arc::new(MockGenerator {
                response: "Generated".to_string(),
            }),
            safety,
            storage,
            vec![],
            0,
            false,
        );

        let result = loop_.try_post_scheduled().await;
        assert!(result.is_none(), "no scheduled items should return None");
    }

    // ========================================================================
    // Integration Test 2: Successful scheduled tweet posts and logs
    // ========================================================================
    #[tokio::test]
    async fn publisher_successful_post_logs_action() {
        let storage = Arc::new(MockStorage::new(None));
        let safety = Arc::new(MockSafety {
            can_tweet: true,
            can_thread: true,
        });

        let loop_ = ContentLoop::new(
            Arc::new(MockGenerator {
                response: "Generated".to_string(),
            }),
            safety,
            storage.clone(),
            vec![],
            0,
            false,
        );

        let result = loop_.try_post_scheduled().await;
        // No scheduled items, so result is None
        assert!(result.is_none());

        // Verify storage interface is functional
        let posted = storage.posted_tweets.lock().expect("lock");
        assert!(posted.is_empty()); // No post without scheduled items
    }

    // ========================================================================
    // Integration Test 3: Dry-run mode doesn't post
    // ========================================================================
    #[tokio::test]
    async fn publisher_dry_run_mode_does_not_post() {
        let storage = Arc::new(MockStorage::new(None));
        let safety = Arc::new(MockSafety {
            can_tweet: true,
            can_thread: true,
        });

        let loop_ = ContentLoop::new(
            Arc::new(MockGenerator {
                response: "Generated".to_string(),
            }),
            safety,
            storage.clone(),
            vec![],
            0,
            true, // dry_run = true
        );

        let result = loop_.try_post_scheduled().await;
        assert!(result.is_none()); // No scheduled items

        // Verify dry_run doesn't invoke posts (even if items existed)
        let posted = storage.posted_tweets.lock().expect("lock");
        assert_eq!(posted.len(), 0, "dry_run should not post");
    }

    // ========================================================================
    // Integration Test 4: Storage interface contract is maintained
    // ========================================================================
    #[tokio::test]
    async fn publisher_storage_interface_contract() {
        let storage = Arc::new(MockStorage::new(None));

        // Verify storage methods are callable and return expected types
        let result = storage.last_tweet_time().await;
        assert!(result.is_ok());

        let result = storage.todays_tweet_times().await;
        assert!(result.is_ok());

        let result = storage.next_scheduled_item().await;
        assert!(result.is_ok());

        let posted = storage.posted_tweets.lock().expect("lock");
        assert_eq!(posted.len(), 0);
    }

    // ========================================================================
    // Integration Test 5: ContentLoop correctly calls storage methods
    // ========================================================================
    #[tokio::test]
    async fn publisher_calls_storage_methods_in_sequence() {
        let storage = Arc::new(MockStorage::new(None));
        let safety = Arc::new(MockSafety {
            can_tweet: true,
            can_thread: true,
        });

        let loop_ = ContentLoop::new(
            Arc::new(MockGenerator {
                response: "Test content".to_string(),
            }),
            safety,
            storage.clone(),
            vec![],
            0,
            false,
        );

        // Call try_post_scheduled which internally calls storage.next_scheduled_item()
        let _result = loop_.try_post_scheduled().await;

        // Verify ContentLoop maintains storage contract
        let _ = storage.last_tweet_time().await;
        let _ = storage.next_scheduled_item().await;
    }
}