terrazzo-terminal 0.2.7

A simple web-based terminal emulator built on Terrazzo.
#![cfg(feature = "client")]

use std::collections::HashMap;
use std::collections::hash_map;
use std::future::ready;
use std::path::Path;
use std::sync::Arc;
use std::sync::Mutex;

use futures::SinkExt;
use futures::StreamExt as _;
use futures::channel::mpsc;
use scopeguard::defer;
use server_fn::ServerFnError;
use terrazzo::autoclone;
use terrazzo::prelude::Ptr;
use terrazzo::prelude::diagnostics;
use wasm_bindgen_futures::spawn_local;

use self::diagnostics::Instrument as _;
use self::diagnostics::debug;
use self::diagnostics::debug_span;
use self::diagnostics::trace;
use self::diagnostics::warn;
use super::server_fn::NotifyRequest;
use super::server_fn::NotifyResponse;
use crate::frontend::remotes::Remote;
use crate::text_editor::file_path::FilePath;

pub(in crate::text_editor) struct NotifyService {
    remote: Remote,
    inner: Ptr<Mutex<Option<NotifyServiceImpl>>>,
}

struct NotifyServiceImpl {
    request: mpsc::UnboundedSender<Result<NotifyRequest, ServerFnError>>,
    handlers: Handlers,
}

type Handlers =
    Arc<Mutex<HashMap<FilePath<Arc<Path>>, HashMap<usize, std::rc::Weak<NotifyRegistration>>>>>;

#[must_use]
pub struct NotifyRegistration {
    id: usize,
    full_path: FilePath<Arc<Path>>,
    notify_service: std::rc::Weak<NotifyService>,
    registration_type: RegistrationType,
    callback: Box<dyn Fn(&NotifyResponse)>,
}

#[derive(Clone, Copy, Debug)]
enum RegistrationType {
    File,
    Folder,
}

impl NotifyService {
    pub fn new(remote: Remote) -> Self {
        Self {
            remote: remote.clone(),
            inner: Ptr::new(Mutex::new(None)),
        }
    }

    fn inner<R>(&self, f: impl FnOnce(&mut NotifyServiceImpl) -> R) -> R {
        let mut inner = self.inner.lock().unwrap();
        let inner = &mut *inner;
        let inner =
            inner.get_or_insert_with(|| NotifyServiceImpl::new(self.remote.clone(), &self.inner));
        f(inner)
    }

    #[must_use]
    pub fn watch_file(
        self: &Ptr<Self>,
        full_path: &FilePath<Arc<Path>>,
        callback: impl Fn(&NotifyResponse) + 'static,
    ) -> Ptr<NotifyRegistration> {
        self.add_handler(full_path, RegistrationType::File, callback)
    }

    #[must_use]
    pub fn watch_folder(
        self: &Ptr<Self>,
        full_path: &FilePath<Arc<Path>>,
        callback: impl Fn(&NotifyResponse) + 'static,
    ) -> Ptr<NotifyRegistration> {
        self.add_handler(full_path, RegistrationType::Folder, callback)
    }

    #[must_use]
    fn add_handler(
        self: &Ptr<Self>,
        full_path: &FilePath<Arc<Path>>,
        registration_type: RegistrationType,
        callback: impl Fn(&NotifyResponse) + 'static,
    ) -> Ptr<NotifyRegistration> {
        let _span = debug_span!("Add watch", ?full_path, ?registration_type).entered();
        debug!("Start");
        defer!(debug!("End"));
        let registration =
            NotifyRegistration::new(full_path.clone(), self, registration_type, callback);
        let handlers = self.inner(|inner| inner.handlers.clone());
        let mut handlers = handlers.lock().unwrap();
        let mut handlers = match handlers.entry(full_path.clone()) {
            hash_map::Entry::Occupied(entry) => {
                debug!("Adding new to exiting watch");
                entry
            }
            hash_map::Entry::Vacant(entry) => {
                debug!("Spawning new watch");
                self.send(Ok(NotifyRequest::Watch {
                    full_path: full_path.clone(),
                }));
                entry.insert_entry(HashMap::new())
            }
        };
        handlers
            .get_mut()
            .insert(registration.id, Ptr::downgrade(&registration));
        return registration;
    }

    fn send(&self, notify_request: Result<NotifyRequest, ServerFnError>) {
        let mut request = self.inner(|inner| inner.request.clone());
        let send_task = async move {
            let () = request
                .send(notify_request)
                .await
                .unwrap_or_else(|error| warn!("Failed to send notify request: {error}"));
        };
        spawn_local(send_task.in_current_span());
    }
}

impl NotifyServiceImpl {
    #[autoclone]
    fn new(remote: Remote, inner: &Ptr<Mutex<Option<NotifyServiceImpl>>>) -> Self {
        let (request_tx, request_rx) = mpsc::unbounded();
        let handlers = Handlers::default();
        let request =
            futures::stream::once(ready(Ok(NotifyRequest::Start { remote }))).chain(request_rx);
        #[cfg(debug_assertions)]
        let request = request.inspect(|r| debug!("Notify request: {r:?}"));
        let task = async move {
            autoclone!(inner, handlers);
            debug!("Start");
            defer!(debug!("End"));
            let Ok(mut response) = super::server_fn::notify(request.into())
                .await
                .inspect_err(|error| warn!("Notify stream failed: {error}"))
            else {
                return;
            };
            while let Some(response) = response.next().await {
                match response {
                    Ok(response) => {
                        debug!("{response:?}");
                        let response_path_parent = response.path.parent();
                        let handlers = {
                            let lock = handlers.lock().unwrap();
                            (*lock).clone()
                        };
                        for (full_path, handlers) in handlers {
                            let full_path = full_path.as_deref().full_path();
                            for handler in handlers.values() {
                                let Some(handler) = handler.upgrade() else {
                                    continue;
                                };
                                if match handler.registration_type {
                                    RegistrationType::File => {
                                        full_path.as_path() == response.path.as_ref()
                                    }
                                    RegistrationType::Folder => {
                                        full_path.as_path() == response.path.as_ref()
                                            || Some(full_path.as_ref()) == response_path_parent
                                    }
                                } {
                                    let callback = &*handler.callback;
                                    callback(&response)
                                }
                            }
                        }
                    }
                    Err(error) => {
                        warn!("{error:?}");
                        inner.lock().unwrap().take();
                        return;
                    }
                }
            }
        };
        spawn_local(task.in_current_span());
        Self {
            request: request_tx,
            handlers,
        }
    }
}

impl NotifyRegistration {
    fn new(
        full_path: FilePath<Arc<Path>>,
        notify_service: &Ptr<NotifyService>,
        registration_type: RegistrationType,
        callback: impl Fn(&NotifyResponse) + 'static,
    ) -> Ptr<Self> {
        use std::sync::atomic::AtomicUsize;
        use std::sync::atomic::Ordering::SeqCst;
        static NEXT: AtomicUsize = AtomicUsize::new(0);
        let id = NEXT.fetch_add(1, SeqCst);
        debug!(id, "Create notify registration");
        Ptr::new(NotifyRegistration {
            id,
            full_path,
            notify_service: Ptr::downgrade(notify_service),
            registration_type,
            callback: Box::new(callback),
        })
    }
}

impl Drop for NotifyRegistration {
    fn drop(&mut self) {
        let _span = debug_span!("Drop notify registration", id = self.id).entered();
        debug!("Start");
        defer!(debug!("End"));
        let Some(notify_service) = self.notify_service.upgrade() else {
            trace!("Notify service is dropped");
            return;
        };
        trace!("Getting handlers");
        {
            let handlers = notify_service.inner(|inner| inner.handlers.clone());
            trace!("Acquire lock");
            let mut handlers = handlers.lock().unwrap();
            trace!("Removing registration");
            let Some(handlers_by_id) = handlers.get_mut(&self.full_path) else {
                warn!("Registrations not found for {:?}", self.full_path);
                return;
            };

            handlers_by_id.remove(&self.id);
            if !handlers_by_id.is_empty() {
                return;
            }
            handlers.remove(&self.full_path);
        }
        notify_service.send(Ok(NotifyRequest::UnWatch {
            full_path: self.full_path.clone(),
        }));
    }
}