fn0 0.2.13

FaaS platform powered by wasmtime
use crate::turso_queue::TursoQueue;
use anyhow::Result;
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
use tokio_util::sync::CancellationToken;

pub struct QueuePoller {
    store: TursoQueue,
    poll_interval: Duration,
}

impl QueuePoller {
    pub fn new(turso_url: &str, auth_token: &str) -> Self {
        Self {
            store: TursoQueue::new(turso_url, auth_token),
            poll_interval: Duration::from_secs(1),
        }
    }

    pub fn store(&self) -> &TursoQueue {
        &self.store
    }

    pub async fn run<F>(&self, execute_fn: F, shutdown: CancellationToken)
    where
        F: Fn(String, String) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>,
    {
        if let Err(e) = self.store.create_tables().await {
            tracing::error!("Failed to create queue tables: {e}");
        }

        loop {
            if shutdown.is_cancelled() {
                break;
            }

            match self.store.claim_task().await {
                Ok(Some(task)) => {
                    let result =
                        (execute_fn)(task.task_name.clone(), task.payload.clone()).await;

                    match result {
                        Ok(()) => {
                            if let Err(e) = self.store.delete_task(&task.id).await {
                                tracing::error!("Failed to delete completed task {}: {e}", task.id);
                            }
                        }
                        Err(e) => {
                            let error_msg = format!("{e:?}");
                            if task.retry_count + 1 >= task.max_retries {
                                if let Err(e) =
                                    self.store.move_to_dead_queue(&task, &error_msg).await
                                {
                                    tracing::error!(
                                        "Failed to move task {} to dead queue: {e}",
                                        task.id
                                    );
                                }
                            } else {
                                if let Err(e) = self.store.retry_task(&task.id).await {
                                    tracing::error!(
                                        "Failed to retry task {}: {e}",
                                        task.id
                                    );
                                }
                            }
                        }
                    }
                }
                Ok(None) => {
                    tokio::select! {
                        _ = tokio::time::sleep(self.poll_interval) => {}
                        _ = shutdown.cancelled() => { break; }
                    }
                }
                Err(e) => {
                    tracing::error!("Failed to claim task: {e}");
                    tokio::select! {
                        _ = tokio::time::sleep(self.poll_interval) => {}
                        _ = shutdown.cancelled() => { break; }
                    }
                }
            }
        }
    }
}