rust-cli-pomodoro 1.4.6

rust-cli-pomodoro manages your time!
use chrono::Utc;
use clap_complete::generate;
use gluesql::prelude::{Glue, MemoryStorage};
use std::collections::HashMap;
use std::error::Error;
use std::io::{self};
use std::sync::{Arc, Mutex};
use tokio::time::sleep;
use tokio::{net::UnixDatagram, sync::mpsc};
use tokio::{sync::mpsc::Sender, task::JoinHandle};

mod command;
mod database;
mod notification;
use database as db;
mod configuration;
mod error;
mod ipc;
mod line_handler;
mod logging;
mod report;

use crate::error::ConfigurationError;
use crate::ipc::{create_client_uds, create_server_uds, Bincodec, MessageRequest, MessageResponse};
use crate::notification::archived_notification;
use crate::notification::notify::{notify_break, notify_work};
use crate::notification::Notification;
use crate::{
    command::{handler, util, CommandType},
    ipc::{get_uds_address, UdsType},
};
use crate::{
    configuration::{get_configuration, Configuration},
    ipc::UdsMessage,
};

#[macro_use]
extern crate log;

// key: notification id, value: spawned notification task
pub type TaskMap = HashMap<u16, JoinHandle<()>>;
pub type ArcGlue = Arc<Mutex<Glue<MemoryStorage>>>;
pub type ArcTaskMap = Arc<Mutex<TaskMap>>;

#[derive(Debug)]
pub struct UserInput {
    pub input: String,
    // pub oneshot_tx: oneshot::Sender<String>,
    pub source: InputSource,
}

#[derive(Debug)]
pub enum InputSource {
    StandardInput,
    UnixDomainSocket,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    logging::initialize_logging();
    debug!("debug test, start pomodoro...");

    let command_type = detect_command_type().await?;

    match command_type {
        CommandType::StartUp(config) => {
            info!("start pomodoro...");
            debug!("CommandType::StartUp");

            let glue = initialize_db().await;
            let mut id_manager: u16 = 1;
            let hash_map: Arc<Mutex<TaskMap>> = Arc::new(Mutex::new(HashMap::new()));
            let (user_input_tx, mut user_input_rx) = mpsc::channel::<UserInput>(64);

            let stdin_tx = user_input_tx.clone();
            // TODO(young): handle tokio::spawn return value nicely so that we can use `?` inside
            let _input_handle = line_handler::handle(stdin_tx);

            // handle uds
            let uds_input_tx = user_input_tx.clone();

            let server_uds_option = create_server_uds().await.unwrap();
            let server_tx = match server_uds_option {
                Some(uds) => {
                    let server_uds = Arc::new(uds);
                    let (server_rx, server_tx) = (server_uds.clone(), server_uds.clone());
                    let _uds_input_handle =
                        spawn_uds_input_handler(uds_input_tx, server_tx, server_rx);

                    Some(server_uds)
                }
                None => None,
            };

            // TODO(young) handle `rx.recv().await` returns None case
            // TODO(young): handle tokio::spawn return value nicely so that we can use `?` inside
            while let Some(user_input) = user_input_rx.recv().await {
                // extract input
                let input = user_input.input.as_str();
                debug!("input: {:?}", input);

                // handle input
                match handler::user_input::handle(input, &mut id_manager, &hash_map, &glue, &config)
                    .await
                {
                    Ok(mut output) => match user_input.source {
                        InputSource::StandardInput => {}
                        InputSource::UnixDomainSocket => {
                            if let Some(ref server_tx) = server_tx {
                                let client_addr = get_uds_address(UdsType::Client);
                                ipc::send_to(
                                    server_tx,
                                    client_addr,
                                    MessageResponse::new(output.take_body())
                                        .encode()?
                                        .as_slice(),
                                )
                                .await;
                            }
                        }
                    },
                    Err(e) => {
                        println!("There was an error analyzing the input: {}", e);

                        match user_input.source {
                            InputSource::StandardInput => {}
                            InputSource::UnixDomainSocket => {
                                if let Some(ref server_tx) = server_tx {
                                    let client_addr = get_uds_address(UdsType::Client);
                                    ipc::send_to(
                                        server_tx,
                                        client_addr,
                                        MessageResponse::new(vec![format!(
                                            "There was an error analyzing the input: {}",
                                            e
                                        )])
                                        .encode()?
                                        .as_slice(),
                                    )
                                    .await;
                                }
                            }
                        }
                    }
                }

                debug!("input: {:?}", user_input);
                util::print_start_up();
            }
        }
        CommandType::UdsClient(matches) => {
            debug!("CommandType::UdsClient");
            let socket = create_client_uds().await?;
            handler::uds_client::handle(matches, socket).await?;
        }
        CommandType::AutoComplete(sub_matches) => {
            if sub_matches.contains_id("shell") {
                if let Some(shell) = util::parse_shell(&sub_matches) {
                    let mut main_command = command::get_main_command();
                    let bin_name = main_command.get_name().to_string();
                    let mut stdout = std::io::stdout();
                    generate(shell, &mut main_command, bin_name, &mut stdout);
                }
            } else {
                println!("no shell name was passed");
            }
        }
    }

    debug!("handle_uds_client_command called successfully");

    Ok(())
}

async fn detect_command_type() -> Result<CommandType, ConfigurationError> {
    let matches = command::get_start_and_uds_client_command().get_matches();
    debug!("handle_uds_client_command, matches: {:?}", &matches);

    let command_type = match matches.subcommand().is_none() {
        true => CommandType::StartUp(get_configuration(&matches)?),
        false => {
            if let Some(val) = matches.subcommand_matches("completion") {
                CommandType::AutoComplete(val.to_owned())
            } else {
                CommandType::UdsClient(matches)
            }
        }
    };

    Ok(command_type)
}

async fn initialize_db() -> Arc<Mutex<Glue<MemoryStorage>>> {
    let glue = Arc::new(Mutex::new(db::get_memory_glue()));
    db::initialize(glue.clone()).await;

    glue
}

// TODO(young): refactor and move to proper place
pub fn spawn_notification(
    configuration: Arc<Configuration>,
    hash_map: Arc<Mutex<TaskMap>>,
    glue: ArcGlue,
    notification: Notification,
) -> JoinHandle<()> {
    tokio::spawn(async move {
        let (id, _, work_time, break_time, _, _, _) = notification.get_values();
        debug!("id: {}, task started", id);

        let before_start_remaining = (notification.get_start_at() - Utc::now()).num_seconds();
        let before = tokio::time::Duration::from_secs(before_start_remaining as u64);
        debug!("before_start_remaining: {:?}", before_start_remaining);
        sleep(before).await;

        if work_time > 0 {
            let wt = tokio::time::Duration::from_secs(work_time as u64 * 60);
            sleep(wt).await;
            debug!("id ({}), work time ({}) done", id, work_time);

            // TODO(young): handle notify report err
            let result = notify_work(&configuration).await;
            if let Ok(report) = result {
                info!("\n{}", report);
                println!("Notification report generated");
                util::write_output(&mut io::stdout());
            }
        }

        if break_time > 0 {
            let bt = tokio::time::Duration::from_secs(break_time as u64 * 60);
            sleep(bt).await;
            debug!("id ({}), break time ({}) done", id, break_time);

            // TODO(young): handle notify report err
            let result = notify_break(&configuration).await;
            if let Ok(report) = result {
                info!("\n{}", report);
                println!("Notification report generated");
                util::write_output(&mut io::stdout());
            }
        }

        let result = notification::delete_notification(id, hash_map, glue.clone()).await;
        if result.is_err() {
            trace!("error occurred while deleting notification");
        }

        debug!("id: {}, notification work time done!", id);
    })
}

fn spawn_uds_input_handler(
    uds_tx: Sender<UserInput>,
    server_tx: Arc<UnixDatagram>,
    server_rx: Arc<UnixDatagram>,
) -> JoinHandle<()> {
    tokio::spawn(async move {
        // TODO(young) handle result
        let rx = server_rx;
        let mut buf = vec![0u8; 256];
        debug!("rx is initialized successfully");
        loop {
            debug!("inside unix domain socket task");
            // TODO(young) handle result
            let (size, addr) = rx.recv_from(&mut buf).await.unwrap();
            debug!("size: {:?}, addr: {:?}", size, addr);

            if let Some(path) = addr.as_pathname() {
                // ignore request from unnamed address
                if path != get_uds_address(ipc::UdsType::Client).as_path() {
                    debug!("addr is different");
                    continue;
                }
            }

            let uds_message = UdsMessage::decode(&buf[..size]).unwrap();
            match uds_message {
                UdsMessage::Public(message) => {
                    let user_input: UserInput = MessageRequest::into(message);
                    debug!("user_input: {:?}", user_input);

                    uds_tx.send(user_input).await.unwrap();
                }
                UdsMessage::Internal(message) => {
                    debug!("internal_message ok, {:?}", message);
                    match message {
                        ipc::internal::Message::Ping => {
                            ipc::send_to(
                                &server_tx,
                                addr.as_pathname().unwrap().to_path_buf(),
                                ipc::internal::Message::Pong.encode().unwrap().as_slice(),
                            )
                            .await;
                        }
                        ipc::internal::Message::Pong => {}
                    }
                }
            }
        }
    })
}