use async_timing_util::{unix_timestamp_ms, DAY_MS};
use chrono::{DateTime, Datelike, NaiveDateTime, Utc};
use ftp::{types::Result, FtpStream};
use run_command::{run_command_pipe_to_terminal, CommandOutput};
use std::{fs, io::Write, process};
use crate::{
config::State,
types::{Exchange, MarketType},
};
pub fn decompress_zstd(path: &str) -> CommandOutput {
let command = format!("unzstd {path} -f");
run_command_pipe_to_terminal(&command)
}
pub fn get_target_path(state: &State) -> (String, Option<String>) {
let State {
crawler_type,
exchange,
market,
symbol,
month: year_month,
day,
raw,
latest_day,
latest_month,
..
} = &state;
let (year_month, day) = if *latest_month {
(last_full_day_month(prev_utc_midnight()), None)
} else if *latest_day {
let prev_midnight = prev_utc_midnight();
(
last_full_day_month(prev_midnight),
Some(last_full_day(prev_midnight)),
)
} else {
match year_month {
Some(year_month) => (year_month.clone(), day.clone()),
None => {
eprintln!("\n🚨 ERROR: must provide --month if not using --latest-month or --latest-day");
process::exit(1);
},
}
};
let crawler_type = if *raw {
format!("raw-{}", crawler_type.to_string().replace("_", "-"))
} else {
state.crawler_type.to_string().replace("_", "-")
};
let folder = format!("{exchange}/{market}/{symbol}/{year_month}/{crawler_type}");
let file = if let Some(day) = day {
let day = if day.len() == 1 {
format!("0{day}")
} else {
day
};
Some(format!(
"{exchange}.{market}.{symbol}.{crawler_type}.{year_month}-{day}.csv.zst"
))
} else {
None
};
(folder, file)
}
pub fn check_valid(state: &State) -> bool {
match state.exchange {
Exchange::Binance => match state.market {
MarketType::Spot => true,
MarketType::LinearSwap => true,
_ => {
println!("\nbinance {} unsupported", state.market);
false
}
},
Exchange::Ftx => match state.market {
MarketType::Spot => true,
MarketType::LinearSwap => true,
_ => {
println!("\nftx {} unsupported", state.market);
false
}
},
Exchange::Bybit => match state.market {
MarketType::LinearSwap => true,
MarketType::InverseSwap => true,
_ => {
println!("\nbybit {} unsupported", state.market);
false
}
},
_ => {
println!("\n{} unsupported", state.exchange);
false
}
}
}
pub fn decompress(state: &State, file: Option<String>) {
if let Some(file) = &file {
println!("\ndecompressing file\n");
let path = format!("{}/{file}", state.output);
let decompress_output = decompress_zstd(&path);
if decompress_output.success() {
let _ = fs::remove_file(path);
}
} else {
let files = fs::read_dir(format!(
"{}/{}",
state.output,
state.crawler_type.to_string().replace("_", "-")
))
.expect("failed to read files in dir");
let files: Vec<String> = files
.map(|file| {
file.expect("file error")
.path()
.to_str()
.unwrap()
.to_string()
})
.collect();
println!("\ndecompressing {} files\n", files.len());
for file in files {
let output = decompress_zstd(&file);
if output.success() {
let _ = fs::remove_file(file);
}
}
};
}
pub fn get_ftp_stream(state: &State) -> Result<FtpStream> {
let mut ftp_stream = FtpStream::connect(&state.address)?;
let password = match &state.password {
Some(password) => password,
None => {
eprintln!("🚨 ERROR: must provide password using -p flag or PASSWORD environment variable. -p flag will take precedence.");
process::exit(1);
}
};
ftp_stream.login(&state.username, password)?;
Ok(ftp_stream)
}
pub fn ftp_retr(state: &State, ftp_stream: &mut FtpStream, folder: &str, file_name: &str) {
let _ = fs::create_dir_all(format!("{}/{folder}", state.output));
let mut file = fs::File::create(format!("{}/{folder}/{file_name}", state.output))
.expect("failed to create file");
let dl = ftp_stream
.simple_retr(&format!("{folder}/{file_name}"))
.expect("failed to find remote file to download");
file.write_all(&dl.into_inner())
.expect("failed to write to file");
}
fn prev_utc_midnight() -> i64 {
let ts = unix_timestamp_ms();
(ts - (ts % DAY_MS)) as i64
}
fn last_full_day(ending_ts: i64) -> String {
let now = DateTime::<Utc>::from_utc(
NaiveDateTime::from_timestamp((ending_ts - DAY_MS as i64) / 1000, 0),
Utc,
);
if now.day().to_string().len() == 1 {
format!("0{}", now.day())
} else {
now.day().to_string()
}
}
fn last_full_day_month(ending_ts: i64) -> String {
let now = DateTime::<Utc>::from_utc(
NaiveDateTime::from_timestamp((ending_ts - DAY_MS as i64) / 1000, 0),
Utc,
);
let month = if now.month().to_string().len() == 1 {
format!("0{}", now.month())
} else {
now.month().to_string()
};
format!("{}-{month}", now.year())
}