glycin 3.0.0-alpha

Sandboxed image decoding
Documentation
static DEFAULT_POOL: LazyLock<Arc<Pool>> = LazyLock::new(Pool::new);

use std::collections::BTreeMap;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::{Arc, LazyLock, Mutex};
use std::time::{Duration, Instant};

use gio::glib;
use gio::prelude::*;

use crate::config::{ConfigEntry, ConfigEntryHash};
use crate::dbus::ZbusProxy;
use crate::util::AsyncMutex;
use crate::{config, dbus, Error, SandboxMechanism};

#[derive(Debug)]
pub struct PooledProcess<P: ZbusProxy<'static> + 'static> {
    last_use: Mutex<Instant>,
    process: Arc<dbus::RemoteProcess<P>>,
    users: Vec<std::sync::Weak<()>>,
}

impl<P: ZbusProxy<'static> + 'static> PooledProcess<P> {
    pub fn use_(&self) -> Arc<dbus::RemoteProcess<P>> {
        *self.last_use.lock().unwrap() = Instant::now();
        self.process.clone()
    }
}

#[derive(Debug)]
pub struct Pool {
    loaders: AsyncMutex<
        BTreeMap<config::ConfigEntryHash, Arc<PooledProcess<dbus::LoaderProxy<'static>>>>,
    >,
    editors: AsyncMutex<
        BTreeMap<config::ConfigEntryHash, Arc<PooledProcess<dbus::EditorProxy<'static>>>>,
    >,
    loader_retention_time: Duration,
}

impl Pool {
    pub fn new() -> Arc<Self> {
        Arc::new(Self {
            loader_retention_time: Duration::from_secs(60),
            loaders: Default::default(),
            editors: Default::default(),
        })
    }

    pub fn global() -> Arc<Self> {
        DEFAULT_POOL.clone()
    }

    pub async fn get_loader(
        &self,
        loader_config: config::ImageLoaderConfig,
        sandbox_mechanism: SandboxMechanism,
        base_dir: Option<PathBuf>,
        cancellable: &gio::Cancellable,
        loader_alive: std::sync::Weak<()>,
    ) -> Result<Arc<PooledProcess<dbus::LoaderProxy<'static>>>, Error> {
        let pooled_loaders = &self.loaders;

        let pp = self
            .get_process(
                pooled_loaders,
                ConfigEntry::Loader(loader_config.clone()),
                sandbox_mechanism,
                base_dir,
                cancellable,
                loader_alive,
            )
            .await?;

        Ok(pp)
    }

    pub async fn get_editor(
        &self,
        editor_config: config::ImageEditorConfig,
        sandbox_mechanism: SandboxMechanism,
        base_dir: Option<PathBuf>,
        cancellable: &gio::Cancellable,
        editor_alive: std::sync::Weak<()>,
    ) -> Result<Arc<PooledProcess<dbus::EditorProxy<'static>>>, Error> {
        let pooled_editors = &self.editors;

        let pp = self
            .get_process(
                pooled_editors,
                ConfigEntry::Editor(editor_config.clone()),
                sandbox_mechanism,
                base_dir,
                cancellable,
                editor_alive,
            )
            .await?;

        Ok(pp)
    }

    pub async fn get_process<P: ZbusProxy<'static> + 'static>(
        &self,
        pooled_processes: &AsyncMutex<BTreeMap<ConfigEntryHash, Arc<PooledProcess<P>>>>,
        config: config::ConfigEntry,
        sandbox_mechanism: SandboxMechanism,
        base_dir: Option<PathBuf>,
        cancellable: &gio::Cancellable,
        alive: std::sync::Weak<()>,
    ) -> Result<Arc<PooledProcess<P>>, Error> {
        let mut pooled_processes = pooled_processes.lock().await;

        let config_hash = config.hash_value(base_dir.clone());

        let pooled_process = pooled_processes.get(&config_hash).cloned();

        if let Some(loader) = pooled_process {
            if loader.process.process_disconnected.load(Ordering::Relaxed) {
                tracing::debug!(
                    "Existing loader in pool is disconnected. Dropping existing loader."
                );
            } else {
                tracing::debug!("Using existing loader from pool.");
                return Ok(loader);
            }
        }

        tracing::debug!("Creating new loader.");

        let process_cancellable = gio::Cancellable::new();
        let Some(process_cancellable_tie) = cancellable.connect_cancelled(glib::clone!(
            #[weak]
            process_cancellable,
            move |_| process_cancellable.cancel()
        )) else {
            return Err(Error::Canceled(None));
        };

        let process = Arc::new(
            dbus::RemoteProcess::new(
                config.clone(),
                sandbox_mechanism,
                base_dir,
                &process_cancellable,
            )
            .await?,
        );

        cancellable.disconnect_cancelled(process_cancellable_tie);

        let pp = Arc::new(PooledProcess {
            last_use: Mutex::new(Instant::now()),
            process: process.clone(),
            users: vec![alive],
        });

        pooled_processes.insert(config_hash, pp.clone());

        Ok(pp)
    }

    pub async fn clean_loaders(self: Arc<Self>) {
        let mut loaders = self.loaders.lock().await;

        loaders.retain(|cfg, loader| {
            let drop = loader.users.iter().all(|x| x.strong_count() == 0)
                && loader.last_use.lock().unwrap().elapsed() > self.loader_retention_time;
            if drop {
                tracing::debug!(
                    "Dropping loader {:?} {}",
                    cfg.exec(),
                    Arc::strong_count(&loader.process)
                )
            }
            !drop
        });
    }
}