mrdocument 0.5.3

Automatic PDF transcription and classification via OpenAI
Documentation
use crate::chatgpt::query_ai;
use crate::error::{Error, Result};
use crate::file_info::FileInfo;
use crate::file_object::FileObject;
use crate::paths::Location;
use crate::pdf::update_metadata;
use crate::profile::Profile;
use notify::event::CreateKind;
use notify::{Event, EventKind};
use std::future::Future;
use std::marker::Send;
use std::path::PathBuf;
use tokio::fs;
use tokio::fs::create_dir_all;
use tokio::io::AsyncWriteExt;
use tokio::task::JoinSet;
use tokio::time::{sleep, Duration};

pub trait EventHandler: Send + 'static {
    fn handle_event(&mut self, event: Event) -> impl Future<Output = ()> + Send;
    fn on_start(&mut self) -> impl Future<Output = ()> + Send {
        async {}
    }
    fn on_stop(self) -> impl Future<Output = ()> + Send
    where
        Self: Sized,
    {
        async {}
    }
}

pub struct Handler {
    profile: Profile,
    tasks: JoinSet<()>,
    concurrency: u8,
}

impl EventHandler for Handler {
    async fn handle_event(&mut self, event: Event) {
        match event {
            Event {
                kind: EventKind::Create(CreateKind::Any),
                paths,
                ..
            }
            | Event {
                kind: EventKind::Create(CreateKind::File),
                paths,
                ..
            } => {
                let existing_paths: Vec<_> =
                    paths.into_iter().filter(|path| path.is_file()).collect();
                for path in existing_paths {
                    self.handle_file(path).await;
                }
            }
            _ => {
                log::trace!("Ignoring event: {event:?}");
            }
        };
    }

    async fn on_stop(self) {
        self.wait().await;
    }
}

impl Handler {
    pub async fn new(profile: Profile, concurrency: u8) -> Result<Self> {
        create_dir_all(profile.paths.make_root(Location::Inbox)).await?;
        create_dir_all(profile.paths.make_root(Location::Outbox)).await?;
        create_dir_all(profile.paths.make_root(Location::Transit)).await?;
        create_dir_all(profile.paths.make_root(Location::Processed)).await?;
        create_dir_all(profile.paths.make_root(Location::Error)).await?;
        Ok(Handler {
            profile,
            tasks: JoinSet::new(),
            concurrency,
        })
    }

    async fn handle_file(&mut self, filepath: PathBuf) {
        while self.tasks.len() >= self.concurrency.into() {
            self.tasks
                .join_next()
                .await
                .expect("Cannot be empty")
                .expect("Task should not panic");
        }
        self.tasks.spawn(Handler::handle_file_entry_point(
            self.profile.clone(),
            filepath.clone(),
        ));
    }

    async fn wait(self) {
        self.tasks.join_all().await;
    }

    async fn handle_file_entry_point(profile: Profile, filepath: PathBuf) {
        log::info!("Processing {filepath:?}");
        match Handler::handle_file_transit(profile, filepath.clone()).await {
            Ok(_) => {
                log::info!("Processed {:?}", filepath);
            }
            Err(err) => {
                log::error!("Unable to process file: {:?}: {}", filepath, err);
            }
        }
    }

    async fn handle_file_transit(profile: Profile, filepath: PathBuf) -> Result<()> {
        let mut file = FileObject::new(profile.paths.clone(), filepath)?;
        log::debug!("Processing as {file:?}");
        match Handler::handle_file_processing(profile, &mut file).await {
            Ok(_) => Ok(()),
            Err(err) => {
                if let Err(err) = file.rename(Location::Error).await {
                    log::error!("Unable to move file to error location: {:?}: {}", file, err);
                }
                Err(err)
            }
        }
    }

    async fn handle_file_processing(profile: Profile, file: &mut FileObject) -> Result<()> {
        log::debug!("Waiting for file");
        sleep(Duration::from_secs(1)).await;
        FileInfo::new(file.get_path())?;
        Handler::wait_for_document(file).await?;
        file.rename(Location::Transit).await?;

        let file_info = FileInfo::new(file.get_path())?;
        let (classes, sources) = Handler::determine_classes_sources(&profile).await?;
        let document_data = query_ai(profile.chatgpt, file_info, classes, sources).await?;
        let dst_path_pdf = file
            .make_path_with_new_filename(
                Location::Outbox,
                document_data.make_path(),
                document_data.make_filename("pdf"),
            )
            .await?;
        update_metadata(file.get_path(), dst_path_pdf, &document_data)
            .await
            .map(|_| ())?;

        if let Some(ref content) = document_data.content {
            let content_path = file
                .make_path_with_new_filename(
                    Location::Outbox,
                    document_data.make_path(),
                    document_data.make_filename("content"),
                )
                .await?;
            let mut out = fs::File::create(content_path).await?;
            out.write_all(content.as_bytes()).await?;
        }
        let summary_path = file
            .make_path_with_new_filename(
                Location::Outbox,
                document_data.make_path(),
                document_data.make_filename("summary"),
            )
            .await?;
        let mut out = fs::File::create(summary_path).await?;
        out.write_all(document_data.summary.as_bytes()).await?;

        file.rename(Location::Processed).await?;

        Ok(())
    }

    async fn determine_classes_sources(profile: &Profile) -> Result<(Vec<String>, Vec<String>)> {
        let path = profile.paths.make_root(Location::Outbox);
        let mut first_level_dirs = Vec::new();
        let mut second_level_dirs = Vec::new();

        let mut entries = fs::read_dir(&path).await?;
        while let Some(entry) = entries.next_entry().await? {
            let dir_path = entry.path();
            let metadata = entry.metadata().await?;
            if metadata.is_dir() {
                if let Some(dir_name) = dir_path.file_name() {
                    first_level_dirs.push(dir_name.to_string_lossy().into_owned());
                }

                let mut sub_entries = fs::read_dir(&dir_path).await?;
                while let Some(sub_entry) = sub_entries.next_entry().await? {
                    let sub_dir_path = sub_entry.path();
                    let sub_metadata = sub_entry.metadata().await?;
                    if sub_metadata.is_dir() {
                        if let Some(sub_dir_name) = sub_dir_path.file_name() {
                            second_level_dirs.push(sub_dir_name.to_string_lossy().into_owned());
                        }
                    }
                }
            }
        }

        Ok((first_level_dirs, second_level_dirs))
    }

    async fn wait_for_document(file: &FileObject) -> Result<()> {
        let mut i = 6;
        while let Err(_) = lopdf::Document::load(file.get_path()).await {
            if tokio::fs::metadata(file.get_path()).await.is_err() {
                return Err(Error::FileDisappearedError(file.get_path()));
            }
            log::info!("waiting for document to become ready: {file:?}");
            sleep(Duration::from_secs(10)).await;
            i = i - 1;
            if i == 0 {
                return Err(Error::NotValidPdfError);
            }
        }
        Ok(())
    }
}