glycin 3.1.0

Sandboxed image decoding
Documentation
static DEFAULT_POOL: LazyLock<Arc<Pool>> = LazyLock::new(|| Arc::new(Pool::default()));

use std::collections::BTreeMap;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::{Arc, LazyLock, Mutex};
use std::time::{Duration, Instant};
use std::usize;

use gio::glib;
use gio::prelude::*;

use crate::config::{ConfigEntry, ConfigEntryHash};
use crate::dbus::ZbusProxy;
use crate::util::{AsyncMutex, TimerHandle, spawn_timeout};
use crate::{Error, SandboxMechanism, config, dbus};

#[derive(Debug)]
pub struct PooledProcess<P: ZbusProxy<'static> + 'static> {
    last_use: Mutex<Instant>,
    _timeout: Arc<Mutex<Option<TimerHandle>>>,
    process: Arc<dbus::RemoteProcess<P>>,
    useage_tracker: Mutex<std::sync::Weak<UsageTracker>>,
}

#[derive(Debug)]
pub struct UsageTracker {
    pool: Arc<Pool>,
    timeout: Arc<Mutex<Option<TimerHandle>>>,
}

impl UsageTracker {
    pub fn new(pool: Arc<Pool>, timeout: Arc<Mutex<Option<TimerHandle>>>) -> Self {
        Self { pool, timeout }
    }
}

impl Drop for UsageTracker {
    fn drop(&mut self) {
        tracing::trace!("One process occupation dropped");
        let pool = self.pool.clone();

        *self.timeout.lock().unwrap() = Some(spawn_timeout(
            self.pool.config.loader_retention_time,
            async {
                pool.clean_loaders().await;
            },
        ));
    }
}

impl<P: ZbusProxy<'static> + 'static> PooledProcess<P> {
    pub fn use_(&self) -> Arc<dbus::RemoteProcess<P>> {
        tracing::trace!("Using pooled process");
        *self.last_use.lock().unwrap() = Instant::now();
        self.process.clone()
    }

    pub fn n_users(&self) -> usize {
        self.useage_tracker.lock().unwrap().strong_count()
    }
}

#[derive(Debug, Default)]
pub struct Pool {
    loaders: AsyncMutex<
        BTreeMap<config::ConfigEntryHash, Vec<Arc<PooledProcess<dbus::LoaderProxy<'static>>>>>,
    >,
    editors: AsyncMutex<
        BTreeMap<config::ConfigEntryHash, Vec<Arc<PooledProcess<dbus::EditorProxy<'static>>>>>,
    >,
    config: PoolConfig,
}

#[derive(Debug)]
pub struct PoolConfig {
    loader_retention_time: Duration,
    max_parallel_operations: usize,
}

impl Default for PoolConfig {
    fn default() -> Self {
        Self {
            loader_retention_time: Duration::from_secs(30),
            max_parallel_operations: usize::MAX,
        }
    }
}

impl PoolConfig {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn max_parallel_operations(&mut self, max_parallel_operations: usize) -> &mut Self {
        if max_parallel_operations == 0 {
            self.max_parallel_operations = usize::MAX;
        } else {
            self.max_parallel_operations = max_parallel_operations;
        }
        self
    }
}

impl Pool {
    pub fn new(config: PoolConfig) -> Arc<Self> {
        let mut pool = Self::default();
        pool.config = config;

        Arc::new(pool)
    }

    pub fn global() -> Arc<Self> {
        DEFAULT_POOL.clone()
    }

    pub(crate) async fn get_loader(
        self: Arc<Self>,
        loader_config: config::ImageLoaderConfig,
        sandbox_mechanism: SandboxMechanism,
        base_dir: Option<PathBuf>,
        cancellable: &gio::Cancellable,
    ) -> Result<
        (
            Arc<PooledProcess<dbus::LoaderProxy<'static>>>,
            Arc<UsageTracker>,
        ),
        Error,
    > {
        let pooled_loaders = &self.loaders;

        let pp = self
            .clone()
            .get_process(
                pooled_loaders,
                ConfigEntry::Loader(loader_config.clone()),
                sandbox_mechanism,
                base_dir,
                cancellable,
            )
            .await?;

        Ok(pp)
    }

    pub(crate) async fn get_editor(
        self: Arc<Self>,
        editor_config: config::ImageEditorConfig,
        sandbox_mechanism: SandboxMechanism,
        base_dir: Option<PathBuf>,
        cancellable: &gio::Cancellable,
    ) -> Result<
        (
            Arc<PooledProcess<dbus::EditorProxy<'static>>>,
            Arc<UsageTracker>,
        ),
        Error,
    > {
        let pooled_editors = &self.editors;

        let pp = self
            .clone()
            .get_process(
                pooled_editors,
                ConfigEntry::Editor(editor_config.clone()),
                sandbox_mechanism,
                base_dir,
                cancellable,
            )
            .await?;

        Ok(pp)
    }

    pub(crate) async fn get_process<P: ZbusProxy<'static> + 'static>(
        self: Arc<Self>,
        pooled_processes: &AsyncMutex<BTreeMap<ConfigEntryHash, Vec<Arc<PooledProcess<P>>>>>,
        config: config::ConfigEntry,
        sandbox_mechanism: SandboxMechanism,
        base_dir: Option<PathBuf>,
        cancellable: &gio::Cancellable,
    ) -> Result<(Arc<PooledProcess<P>>, Arc<UsageTracker>), Error> {
        let config_hash = config.hash_value(base_dir.clone(), sandbox_mechanism);
        let mut pooled_processes = pooled_processes.lock().await;
        let pooled_processes = pooled_processes.entry(config_hash).or_default();

        for process in pooled_processes.iter() {
            if process.process.process_disconnected.load(Ordering::Relaxed) {
                tracing::debug!("Existing loader/editor in pool is disconnected. Trying next.");
            } else if process.n_users() >= self.config.max_parallel_operations {
                tracing::debug!(
                    "Existing loader/editor in pool is at 'max_parallel_operations'. Trying next."
                );
            } else {
                tracing::debug!("Using existing loader from pool.");
                let mut current_usage_tracker = process.useage_tracker.lock().unwrap();
                let usage_tracker = current_usage_tracker.upgrade().unwrap_or_else(|| {
                    Arc::new(UsageTracker::new(self.clone(), process._timeout.clone()))
                });
                *current_usage_tracker = Arc::downgrade(&usage_tracker);
                return Ok((process.clone(), usage_tracker));
            }
        }

        tracing::debug!("No existing loader/editor in pool. Spawning new one.");

        let process_cancellable = gio::Cancellable::new();
        let Some(process_cancellable_tie) = cancellable.connect_cancelled(glib::clone!(
            #[weak]
            process_cancellable,
            move |_| process_cancellable.cancel()
        )) else {
            return Err(Error::Canceled(None));
        };

        let process = Arc::new(
            dbus::RemoteProcess::new(
                config.clone(),
                sandbox_mechanism,
                base_dir,
                &process_cancellable,
            )
            .await?,
        );

        cancellable.disconnect_cancelled(process_cancellable_tie);

        let _timeout = Arc::new(Mutex::new(None));

        let usage_tracker = Arc::new(UsageTracker::new(self.clone(), _timeout.clone()));

        let pp = Arc::new(PooledProcess {
            last_use: Mutex::new(Instant::now()),
            _timeout,
            process: process.clone(),
            useage_tracker: Mutex::new(Arc::downgrade(&usage_tracker)),
        });

        pooled_processes.push(pp.clone());

        Ok((pp, usage_tracker))
    }

    pub(crate) async fn clean_loaders(self: Arc<Self>) {
        tracing::debug!("Cleaning up loaders");
        let mut loader_map = self.loaders.lock().await;

        for (cfg, loaders) in loader_map.iter_mut() {
            loaders.retain(|loader| {
                let n_users = loader.n_users();
                let idle = loader.last_use.lock().unwrap().elapsed();
                let drop = n_users == 0 && idle > self.config.loader_retention_time;

                tracing::debug!(
                    "Loader {:?}: drop {drop} users {n_users} (max {}), idle {idle:?} (max {:?})",
                    cfg.exec(),
                    self.config.max_parallel_operations,
                    self.config.loader_retention_time
                );

                if drop {
                    tracing::debug!(
                        "Dropping loader {:?} {}",
                        cfg.exec(),
                        Arc::strong_count(&loader.process)
                    )
                }
                !drop
            });
        }
    }
}