rolldown 0.1.0

Fast JavaScript bundler in Rust, designed for the future of Vite
Documentation
use crate::watch::event::{BundleEvent, WatcherChangeData, WatcherEvent};
use anyhow::Context;
use arcstr::ArcStr;
#[cfg(not(target_family = "wasm"))]
use notify::Watcher as _;
use notify::{Config, RecommendedWatcher, event::ModifyKind};

use rolldown_common::{NotifyOption, WatcherChangeKind};
use rolldown_error::BuildResult;
use rolldown_utils::dashmap::FxDashSet;
use std::{
  ops::Deref,
  sync::{
    Arc,
    atomic::{AtomicBool, Ordering},
    mpsc::{Receiver, Sender, channel},
  },
  time::Duration,
};
use tokio::sync::Mutex;

use crate::Bundler;

use anyhow::Result;

use super::{
  emitter::{SharedWatcherEmitter, WatcherEmitter},
  watcher_task::WatcherTask,
};

enum WatcherChannelMsg {
  NotifyEvent(notify::Result<notify::Event>),
  Close,
}
enum ExecChannelMsg {
  Exec,
  Close,
}
pub struct WatcherImpl {
  pub emitter: SharedWatcherEmitter,
  tasks: Vec<WatcherTask>,
  notify_watcher: Arc<Mutex<RecommendedWatcher>>,
  running: AtomicBool,
  watch_changes: FxDashSet<WatcherChangeData>,
  tx: Arc<Sender<WatcherChannelMsg>>,
  rx: Arc<Mutex<Receiver<WatcherChannelMsg>>>,
  exec_tx: Arc<Sender<ExecChannelMsg>>,
  exec_rx: Arc<Mutex<Receiver<ExecChannelMsg>>>,
  // debounce invalidating
  invalidating: AtomicBool,
  bundlers: Vec<Arc<Mutex<Bundler>>>,
}

impl WatcherImpl {
  #[expect(clippy::needless_pass_by_value)]
  pub fn new(
    bundlers: Vec<Arc<Mutex<Bundler>>>,
    notify_option: Option<NotifyOption>,
  ) -> Result<Self> {
    let (tx, rx) = channel();
    let (exec_tx, exec_rx) = channel();
    let tx = Arc::new(tx);
    let cloned_tx = Arc::clone(&tx);
    let watch_option = {
      let config = Config::default();
      if let Some(notify) = &notify_option {
        if let Some(poll_interval) = notify.poll_interval {
          config.with_poll_interval(poll_interval);
        }
        config.with_compare_contents(notify.compare_contents);
      }
      config
    };
    let notify_watcher = Arc::new(Mutex::new(RecommendedWatcher::new(
      move |res| {
        if let Err(e) = tx.send(WatcherChannelMsg::NotifyEvent(res)) {
          eprintln!(
            "Watcher: failed to send file change notification - channel closed while processing file system event: {e:?}"
          );
        }
      },
      watch_option,
    )?));
    let notify_watch_files = Arc::new(FxDashSet::default());
    let emitter = Arc::new(WatcherEmitter::new());

    let tasks = bundlers
      .iter()
      .map(|bundler| {
        WatcherTask::new(
          Arc::clone(bundler),
          Arc::clone(&emitter),
          Arc::clone(&notify_watcher),
          Arc::clone(&notify_watch_files),
        )
      })
      .collect();

    Ok(Self {
      tasks,
      emitter,
      notify_watcher,
      running: AtomicBool::default(),
      watch_changes: FxDashSet::default(),
      rx: Arc::new(Mutex::new(rx)),
      tx: cloned_tx,
      exec_tx: Arc::new(exec_tx),
      exec_rx: Arc::new(Mutex::new(exec_rx)),
      invalidating: AtomicBool::default(),
      bundlers,
    })
  }

  #[tracing::instrument(level = "debug", skip_all)]
  pub fn invalidate(&self, data: Option<WatcherChangeData>) {
    tracing::debug!(name= "watch invalidate", running = ?self.running.load(Ordering::Relaxed));

    if let Some(data) = data {
      self.watch_changes.insert(data);
    }

    if self.running.load(Ordering::Relaxed) || self.invalidating.load(Ordering::Relaxed) {
      return;
    }

    self.invalidating.store(true, Ordering::Relaxed);
    self.exec_tx.send(ExecChannelMsg::Exec).expect(
      "Watcher: failed to send Exec message - watcher event loop terminated while scheduling rebuild"
    );
  }

  #[tracing::instrument(level = "debug", skip_all)]
  pub async fn run(&self, changed_files: &[ArcStr]) -> BuildResult<()> {
    self.emitter.emit(WatcherEvent::Restart)?;

    self.running.store(true, Ordering::Relaxed);
    self.emitter.emit(WatcherEvent::Event(BundleEvent::Start))?;
    for task in &self.tasks {
      task.run(changed_files).await?;
    }

    self.invalidating.store(false, Ordering::Relaxed);
    self.running.store(false, Ordering::Relaxed);
    self.emitter.emit(WatcherEvent::Event(BundleEvent::End))?;

    if !self.watch_changes.is_empty() {
      self.invalidate(None);
    }

    Ok(())
  }

  #[tracing::instrument(level = "debug", skip_all)]
  pub async fn close(&self) -> anyhow::Result<()> {
    // close channel
    self
      .tx
      .send(WatcherChannelMsg::Close)
      .context("Watcher: failed to send Close message - watcher event loop already terminated")?;
    self.exec_tx.send(ExecChannelMsg::Close).context(
      "Watcher: failed to send Close message to exec channel - watcher execution loop already terminated"
    )?;
    // stop watching files
    // TODO the notify watcher should be dropped, because the stop method is private
    let inner = self.notify_watcher.lock().await;
    // The inner mutex should be dropped to avoid deadlock with bundler lock at `Watcher::run`
    std::mem::drop(inner);
    // emit close event
    self.emitter.emit(WatcherEvent::Close)?;
    // call close watcher hook
    for task in &self.tasks {
      task.close().await?;
    }

    Ok(())
  }

  pub async fn start(&self) {
    let build_delay = {
      let mut build_delay: u32 = 0;
      for bundler in &self.bundlers {
        let bundler = bundler.lock().await;
        if let Some(delay) = bundler.options.watch.build_delay {
          if delay > build_delay {
            build_delay = delay;
          }
        }
      }
      build_delay
    };

    let _ = self.run(&[]).await;
    let future = async move {
      let exec_rx = self.exec_rx.lock().await;
      while let Ok(msg) = exec_rx.recv() {
        match msg {
          ExecChannelMsg::Exec => {
            tokio::time::sleep(Duration::from_millis(u64::from(build_delay))).await;
            tracing::debug!(name= "watcher invalidate", watch_changes = ?self.watch_changes);
            let watch_changes =
              self.watch_changes.iter().map(|v| v.deref().clone()).collect::<Vec<_>>();
            for change in &watch_changes {
              for task in &self.tasks {
                task.on_change(change.path.as_str(), change.kind).await;
                task.invalidate(change.path.as_str()).await;
              }
              self.watch_changes.remove(change);
            }
            let changed_files =
              watch_changes.iter().map(|item| item.path.clone()).collect::<Vec<_>>();
            let _ = self.run(&changed_files).await;
          }
          ExecChannelMsg::Close => break,
        }
      }
    };

    rolldown_utils::futures::block_on(future);
  }
}

#[tracing::instrument(level = "debug", skip(watcher))]
pub fn emit_change_event(watcher: &Arc<WatcherImpl>, path: &str, kind: WatcherChangeKind) {
  let _ = watcher
    .emitter
    .emit(WatcherEvent::Change(WatcherChangeData { path: path.into(), kind }))
    .map_err(|e| eprintln!("Rolldown internal error: {e:?}"));
}

#[tracing::instrument(level = "debug", skip_all)]
pub fn wait_for_change(watcher: Arc<WatcherImpl>) {
  let future = async move {
    let mut run = true;
    while run {
      let rx = watcher.rx.lock().await;
      match rx.recv() {
        Ok(msg) => match msg {
          WatcherChannelMsg::NotifyEvent(event) => match event {
            Ok(event) => {
              tracing::debug!(name= "notify event ", event = ?event);
              for path in event.paths {
                let id = path.to_string_lossy();
                if let Some(kind) = match event.kind {
                  notify::EventKind::Create(_) => Some(WatcherChangeKind::Create),
                  notify::EventKind::Modify(
                    ModifyKind::Data(_) | ModifyKind::Any, /* windows*/
                  ) => {
                    tracing::debug!(name= "notify updated content", path = ?id.as_ref(), content= ?std::fs::read_to_string(id.as_ref()).unwrap());
                    Some(WatcherChangeKind::Update)
                  }
                  notify::EventKind::Remove(_) => Some(WatcherChangeKind::Delete),
                  _ => None,
                } {
                  emit_change_event(&watcher, id.as_ref(), kind);
                  watcher.invalidate(Some(WatcherChangeData { path: id.into(), kind }));
                }
              }
            }
            Err(e) => eprintln!("notify error: {e:?}"),
          },
          WatcherChannelMsg::Close => run = false,
        },
        Err(e) => {
          eprintln!("watcher receiver error: {e:?}");
        }
      }
    }
  };
  tokio::spawn(future);
}