supabase-plus 0.8.13

An extra set of tools for managing Supabase projects going beyond the possibilities of regular Supabase CLI
use anyhow::Context;
use derive_setters::Setters;
use futures_channel::mpsc::{Receiver, Sender, channel};
use futures_util::{SinkExt, StreamExt, pin_mut};
use notify_types::event::{CreateKind, ModifyKind};
use std::{collections::HashSet, path::PathBuf, sync::Arc, time::Duration};
use tokio::time::sleep;
use watchexec::Watchexec;
use watchexec_events::Tag;
use watchexec_signals::Signal;

use crate::errors::NoWay;

#[derive(Setters, Default)]
#[setters(strip_option)]
pub struct CodeWatch<T> {
    #[setters(skip)]
    watcher: Option<Arc<Watchexec>>,

    #[setters(skip)]
    dedup: Option<(Sender<Arc<PathBuf>>, Receiver<Arc<PathBuf>>)>,

    #[setters(into)]
    extension: Option<String>,

    queuer: Option<Sender<T>>,

    ctor: Option<WatcherCtor<T>>,
}

type WatcherCtor<T> = Arc<dyn Fn(Arc<PathBuf>) -> T + Send + Sync + 'static>;

impl<T: Send + Sync + 'static + Eq + std::hash::Hash> CodeWatch<T> {
    pub fn build(
        mut self,
        path: &str,
        ctor: impl Fn(Arc<PathBuf>) -> T + Send + Sync + 'static + Clone,
    ) -> anyhow::Result<Self> {
        self.dedup = Some(channel::<Arc<PathBuf>>(1024));
        self.ctor = Some(Arc::new(ctor));

        let maybe_expected_extension = Arc::new(self.extension.clone());

        let watcher = Watchexec::new({
            let dedup_queuer = self
                .dedup
                .as_ref()
                .no_way_because("the dedup value has been just set")
                .0
                .clone();

            let maybe_expected_extension = maybe_expected_extension.clone();

            move |mut action| {
                for event in action.events.iter() {
                    for tag in &event.tags {
                        let Tag::FileEventKind(kind) = tag else {
                            continue;
                        };

                        if !matches!(
                            kind,
                            notify_types::event::EventKind::Create(CreateKind::File)
                                | notify_types::event::EventKind::Modify(ModifyKind::Data(_))
                        ) {
                            continue;
                        };

                        for (path, file_type) in event.paths() {
                            if !matches!(file_type, Some(watchexec_events::FileType::File)) {
                                continue;
                            }

                            let Some(extension) = path.extension() else {
                                continue;
                            };

                            if let Some(expected_extension) = maybe_expected_extension.as_ref() {
                                if !extension.to_string_lossy().ends_with(expected_extension) {
                                    continue;
                                }
                            }

                            let mut dedup_queuer = dedup_queuer.clone();
                            let path = Arc::new(path.to_owned());

                            tokio::spawn(async move {
                                dedup_queuer
                                    .send(path)
                                    .await
                                    .no_way_because("the receiver should still be alive by design");
                            });
                        }
                    }
                }

                if action.signals().any(|sig| sig == Signal::Interrupt) {
                    action.quit();
                }

                action
            }
        })
        .context("An error occurred while building the file watcher")?;

        watcher.config.pathset([path]);
        self.watcher = Some(watcher);

        Ok(self)
    }

    pub fn run(mut self) -> tokio::task::JoinHandle<Result<(), watchexec::error::CriticalError>> {
        tokio::spawn({
            let ctor: WatcherCtor<T> = Arc::clone(
                &self
                    .ctor
                    .no_way_because("the codebase should have run `build` method first"),
            );

            let mut dedup_queue = self
                .dedup
                .no_way_because("the codebase should have run `build` method first")
                .1;

            async move {
                loop {
                    let deadline = sleep(Duration::from_millis(16));

                    let mut batch = Vec::with_capacity(1024);

                    let batch_fut = dedup_queue
                        .by_ref()
                        .take(1024)
                        .take_until(deadline)
                        .collect::<Vec<_>>();

                    pin_mut!(batch_fut);

                    tokio::select! {
                        mut rest = batch_fut => {
                            batch.extend(rest.drain(..));
                        }
                        _ = tokio::signal::ctrl_c() => {
                            eprintln!(" terminated watcher");
                            break;
                        }
                    }

                    if batch.is_empty() {
                        continue;
                    }

                    let deduped = batch
                        .into_iter()
                        .map(|path| ctor(path))
                        .collect::<HashSet<_>>();

                    if let Some(queuer) = &mut self.queuer {
                        for item in deduped {
                            queuer
                                .send(item)
                                .await
                                .no_way_because("the receiver should still be alive by design");
                        }
                    }
                }
            }
        });

        self.watcher
            .no_way_because("the codebase should have run `build` method first")
            .main()
    }
}