supabase-plus 0.4.14

An extra set of tools for managing Supabase projects going beyond the possibilities of regular Supabase CLI
use std::{path::PathBuf, sync::Arc};

use super::prelude::*;
use crate::abstraction::{CodeWatch, SupabaseProject};

use futures_channel::mpsc::Sender;
use futures_util::{StreamExt, sink::SinkExt};
use tokio::{fs::File, io::AsyncReadExt};
pub struct SqlFileExecutor;

impl SqlFileExecutor {
    pub fn start() -> Sender<ExecuteEvent> {
        let (execute_queuer, mut execute_queue) =
            futures_channel::mpsc::channel::<ExecuteEvent>(1024);

        tokio::spawn(async move {
            while let Some(ExecuteEvent {
                path,
                immediate_run,
            }) = execute_queue.next().await
            {
                if immediate_run {
                    println!("🛫 Executing file immediately ({})", path.to_string_lossy());
                } else {
                    println!("🔍 Change observed ({})", path.to_string_lossy());
                }

                let mut file = File::open(path.to_str().unwrap()).await.unwrap();
                let mut sql = String::new();

                file.read_to_string(&mut sql).await.unwrap();

                match SupabaseProject::execute_sql(&sql).await {
                    Err(err) => eprintln!("❌ E{}\n", err),
                    _ => println!("✅ Query run successfully\n"),
                }
            }
        });

        execute_queuer
    }
}

#[derive(Default, PartialEq, Eq, Hash)]
pub struct ExecuteEvent {
    path: Arc<PathBuf>,
    immediate_run: bool,
}

impl ExecuteEvent {
    pub fn immediate(path: Arc<PathBuf>) -> Self {
        Self {
            path,
            immediate_run: true,
        }
    }

    pub fn watched(path: Arc<PathBuf>) -> Self {
        Self {
            path,
            immediate_run: false,
        }
    }
}

#[async_trait]
impl CliSubcommand for Watch {
    async fn run(self: Box<Self>) {
        let mut queuer = SqlFileExecutor::start();

        let codewatch = CodeWatch::default()
            .extension("sql")
            .queuer(queuer.clone())
            .build(&self.directory, ExecuteEvent::watched);

        if self.immediate {
            let paths = glob::glob(&format!("{}/**/*.sql", self.directory))
                .expect("Invalid directory path")
                .filter_map(Result::ok);

            for path in paths {
                let path = Arc::new(path);
                queuer.send(ExecuteEvent::immediate(path)).await.unwrap();
            }
        }

        codewatch.run().await.unwrap().unwrap();
    }
}