use clap::Parser;
use clio::{Input, Output};
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use nostr_db::{Db, Event, Filter, FromEventData};
use rayon::prelude::*;
use std::{
fs::File,
io::{BufRead, BufReader, Write},
path::{Path, PathBuf},
};
mod bench;
mod relay;
pub use bench::*;
pub use relay::*;
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error(transparent)]
Db(#[from] nostr_db::Error),
#[error(transparent)]
Relay(#[from] nostr_relay::Error),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Clio(#[from] clio::Error),
#[error("event error{0}")]
Event(String),
#[error("{0}")]
Message(String),
}
pub type Result<T, E = Error> = core::result::Result<T, E>;
#[derive(Debug, Clone, Parser)]
pub struct ImportOpts {
#[arg(value_name = "PATH")]
pub path: PathBuf,
#[arg(long, value_name = "BOOL")]
pub search: bool,
#[clap(value_parser, default_value = "-")]
pub input: Input,
}
#[derive(Debug, Clone, Parser)]
pub struct ExportOpts {
#[arg(value_name = "PATH")]
pub path: PathBuf,
#[arg(short = 'f', long, value_name = "FILTER", default_value = "{}")]
pub filter: Filter,
#[arg(long, value_name = "BOOL")]
pub desc: Option<bool>,
#[clap(value_parser, default_value = "-")]
pub output: Output,
}
#[derive(Debug, Clone, Parser)]
pub struct DeleteOpts {
#[arg(value_name = "PATH")]
pub path: PathBuf,
#[arg(short = 'f', long, value_name = "FILTER", default_value = "{}")]
pub filter: Filter,
#[arg(long)]
pub dry_run: bool,
}
pub fn import_opts(opts: ImportOpts) -> anyhow::Result<usize> {
fn run_import_opts<F: Fn(usize)>(opts: ImportOpts, f: F) -> anyhow::Result<usize> {
let count = import(&opts.path, opts.input, 10000, opts.search, f)?;
Ok(count)
}
let path = opts.input.path();
if path.is_local() {
let total_size = count_lines(path.path())? as u64;
let pb = create_pb(total_size);
let total = run_import_opts(opts, |c| {
if c % 1000 == 0 {
pb.set_position(c as u64);
}
})?;
pb.finish_with_message("finished");
Ok(total)
} else {
run_import_opts(opts, |_| {})
}
}
fn count_lines<P: AsRef<Path>>(path: P) -> std::io::Result<usize> {
let file = File::open(path)?;
let reader = BufReader::new(file);
let lines = reader.lines();
Ok(lines.count())
}
pub fn import<F: Fn(usize)>(
path: &PathBuf,
input: Input,
batch: usize,
search: bool,
f: F,
) -> Result<usize> {
let db = Db::open(path)?;
db.check_schema()?;
let reader = BufReader::new(input);
let lines = reader.lines();
let mut batches = vec![];
let mut count = 0;
fn parse_events(batches: &Vec<String>, search: bool) -> Vec<Event> {
batches
.par_iter()
.filter_map(|s| {
let event = Event::from_data(s.as_bytes());
match event {
Ok(mut event) => {
if search {
event.build_note_words();
}
Some(event)
}
Err(e) => {
println!("error: {} {}", s, e);
None
}
}
})
.collect()
}
let parse_batch = 30;
let mut writer = db.writer()?;
for item in lines.enumerate() {
let line = item.1?;
let index = item.0;
if index > 0 && index % parse_batch == 0 {
let events = parse_events(&batches, search);
for event in events {
db.put(&mut writer, event)?;
count += 1;
}
batches.clear();
}
batches.push(line);
if index > 0 && index % batch == 0 {
db.commit(writer)?;
writer = db.writer()?;
}
f(index);
}
db.commit(writer)?;
db.batch_put(parse_events(&batches, search))?;
count += batches.len();
db.flush()?;
Ok(count)
}
fn create_pb(total: u64) -> ProgressBar {
let pb = ProgressBar::new(total);
pb.set_style(
ProgressStyle::with_template(
"{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} ({eta})",
)
.unwrap()
.with_key(
"eta",
|state: &ProgressState, w: &mut dyn std::fmt::Write| {
write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()
},
)
.progress_chars("#>-"),
);
pb
}
pub fn export_opts(opts: ExportOpts) -> anyhow::Result<usize> {
fn run_export_opts<F: Fn(usize)>(mut opts: ExportOpts, f: F) -> anyhow::Result<usize> {
opts.filter.build_words();
if let Some(desc) = opts.desc {
opts.filter.desc = desc;
}
let count = export(&opts.path, opts.output, &opts.filter, f)?;
Ok(count)
}
if opts.output.path().is_local() {
let total_size = count(&opts.path, &opts.filter)?;
let pb = create_pb(total_size);
let total = run_export_opts(opts, |c| {
if c % 1000 == 0 {
pb.set_position(c as u64);
}
})?;
pb.finish_with_message("finished");
Ok(total)
} else {
run_export_opts(opts, |_| {})
}
}
pub fn count(path: &PathBuf, filter: &Filter) -> Result<u64> {
let db = Db::open(path)?;
let reader = db.reader()?;
let iter = db.iter::<String, _>(&reader, filter)?;
Ok(iter.size()?.0)
}
pub fn export<F: Fn(usize)>(
path: &PathBuf,
mut output: Output,
filter: &Filter,
f: F,
) -> Result<usize> {
let db = Db::open(path)?;
let reader = db.reader()?;
let iter = db.iter::<String, _>(&reader, filter)?;
let mut count = 0;
for event in iter {
count += 1;
let mut json: String = event?;
json.push('\n');
output.write_all(json.as_bytes())?;
f(count);
}
output.finish()?;
Ok(count)
}
pub fn delete(path: &PathBuf, filter: &Filter, dry_run: bool) -> Result<usize> {
let db = Db::open(path)?;
let reader = db.writer()?;
let iter = db.iter::<Vec<u8>, _>(&reader, filter)?;
let ids = iter.collect::<Result<Vec<Vec<u8>>, nostr_db::Error>>()?;
drop(reader);
let mut writer = db.writer()?;
for id in &ids {
if !dry_run {
db.del(&mut writer, id)?;
}
}
db.commit(writer)?;
Ok(ids.len())
}