crawler_data_client 0.0.9

client for programmatic download of crawler data
use async_timing_util::{unix_timestamp_ms, DAY_MS};
use chrono::{DateTime, Datelike, NaiveDateTime, Utc};
use ftp::{types::Result, FtpStream};
use run_command::{run_command_pipe_to_terminal, CommandOutput};
use std::{fs, io::Write, process};

use crate::{
    config::State,
    types::{Exchange, MarketType},
};

pub fn decompress_zstd(path: &str) -> CommandOutput {
    let command = format!("unzstd {path} -f");
    run_command_pipe_to_terminal(&command)
}

pub fn get_target_path(state: &State) -> (String, Option<String>) {
    let State {
        crawler_type,
        exchange,
        market,
        symbol,
        month: year_month,
        day,
        raw,
        latest_day,
        latest_month,
        ..
    } = &state;
    let (year_month, day) = if *latest_month {
        (last_full_day_month(prev_utc_midnight()), None)
    } else if *latest_day {
        let prev_midnight = prev_utc_midnight();
        (
            last_full_day_month(prev_midnight),
            Some(last_full_day(prev_midnight)),
        )
    } else {
        match year_month {
            Some(year_month) => (year_month.clone(), day.clone()),
            None => {
                eprintln!("\n🚨 ERROR: must provide --month if not using --latest-month or --latest-day");
                process::exit(1);
            },
        }
    };
    let crawler_type = if *raw {
        format!("raw-{}", crawler_type.to_string().replace("_", "-"))
    } else {
        state.crawler_type.to_string().replace("_", "-")
    };

    let folder = format!("{exchange}/{market}/{symbol}/{year_month}/{crawler_type}");

    let file = if let Some(day) = day {
        let day = if day.len() == 1 {
            format!("0{day}")
        } else {
            day
        };
        Some(format!(
            "{exchange}.{market}.{symbol}.{crawler_type}.{year_month}-{day}.csv.zst"
        ))
    } else {
        None
    };

    (folder, file)
}

pub fn check_valid(state: &State) -> bool {
    match state.exchange {
        Exchange::Binance => match state.market {
            MarketType::Spot => true,
            MarketType::LinearSwap => true,
            _ => {
                println!("\nbinance {} unsupported", state.market);
                false
            }
        },
        Exchange::Ftx => match state.market {
            MarketType::Spot => true,
            MarketType::LinearSwap => true,
            _ => {
                println!("\nftx {} unsupported", state.market);
                false
            }
        },
        Exchange::Bybit => match state.market {
            MarketType::LinearSwap => true,
            MarketType::InverseSwap => true,
            _ => {
                println!("\nbybit {} unsupported", state.market);
                false
            }
        },
        _ => {
            println!("\n{} unsupported", state.exchange);
            false
        }
    }
}

pub fn decompress(state: &State, file: Option<String>) {
    if let Some(file) = &file {
        println!("\ndecompressing file\n");
        let path = format!("{}/{file}", state.output);
        let decompress_output = decompress_zstd(&path);
        if decompress_output.success() {
            let _ = fs::remove_file(path);
        }
    } else {
        let files = fs::read_dir(format!(
            "{}/{}",
            state.output,
            state.crawler_type.to_string().replace("_", "-")
        ))
        .expect("failed to read files in dir");
        let files: Vec<String> = files
            .map(|file| {
                file.expect("file error")
                    .path()
                    .to_str()
                    .unwrap()
                    .to_string()
            })
            .collect();
        println!("\ndecompressing {} files\n", files.len());
        for file in files {
            let output = decompress_zstd(&file);
            if output.success() {
                let _ = fs::remove_file(file);
            }
        }
    };
}

pub fn get_ftp_stream(state: &State) -> Result<FtpStream> {
    let mut ftp_stream = FtpStream::connect(&state.address)?;
    let password = match &state.password {
        Some(password) => password,
        None => {
            eprintln!("🚨 ERROR: must provide password using -p flag or PASSWORD environment variable. -p flag will take precedence.");
            process::exit(1);
        }
    };
    ftp_stream.login(&state.username, password)?;
    Ok(ftp_stream)
}

pub fn ftp_retr(state: &State, ftp_stream: &mut FtpStream, folder: &str, file_name: &str) {
    let _ = fs::create_dir_all(format!("{}/{folder}", state.output));
    let mut file = fs::File::create(format!("{}/{folder}/{file_name}", state.output))
        .expect("failed to create file");
    // let mut file = LineWriter::new(file);
    let dl = ftp_stream
        .simple_retr(&format!("{folder}/{file_name}"))
        .expect("failed to find remote file to download");
    file.write_all(&dl.into_inner())
        .expect("failed to write to file");
}

fn prev_utc_midnight() -> i64 {
    let ts = unix_timestamp_ms();
    (ts - (ts % DAY_MS)) as i64
}

fn last_full_day(ending_ts: i64) -> String {
    let now = DateTime::<Utc>::from_utc(
        NaiveDateTime::from_timestamp((ending_ts - DAY_MS as i64) / 1000, 0),
        Utc,
    );
    if now.day().to_string().len() == 1 {
        format!("0{}", now.day())
    } else {
        now.day().to_string()
    }
}

fn last_full_day_month(ending_ts: i64) -> String {
    let now = DateTime::<Utc>::from_utc(
        NaiveDateTime::from_timestamp((ending_ts - DAY_MS as i64) / 1000, 0),
        Utc,
    );
    let month = if now.month().to_string().len() == 1 {
        format!("0{}", now.month())
    } else {
        now.month().to_string()
    };
    format!("{}-{month}", now.year())
}