osm-tag-csv-history 0.5.0

Use CSV tools to see who's mapping what in OpenStreetMap.
#[macro_use]
extern crate log;
extern crate csv;
extern crate env_logger;
extern crate osmio;
#[macro_use]
extern crate anyhow;
extern crate clap;
extern crate do_every;
extern crate flate2;
extern crate read_progress;
extern crate rusqlite;
extern crate serde_json;

use std::borrow::Cow;
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
use std::fs::File;
use std::io::BufReader;
use std::str::FromStr;
use std::time::Instant;

use clap::{App, Arg};
use osmio::{OSMObj, OSMObjBase, OSMObjectType, OSMReader};

use anyhow::{Context, Result};
use flate2::write::GzEncoder;
use flate2::Compression;
use read_progress::ReaderWithSize;
use rusqlite::{Connection, OptionalExtension};

#[allow(clippy::upper_case_acronyms)]
enum OutputFormat {
    CSV,
    TSV,
}

#[derive(Debug)]
enum Column {
    Key,
    NewValue,
    OldValue,
    Id,
    RawId,
    NewVersion,
    OldVersion,
    IsoDatetime,
    EpochDatetime,
    Username,
    Uid,
    ChangesetId,

    ChangesetTag(String),

    TagCountDelta,
    ObjectTypeShort,
    ObjectTypeLong,
}

impl FromStr for Column {
    type Err = anyhow::Error;
    fn from_str(val: &str) -> Result<Self, Self::Err> {
        match val.to_lowercase().trim() {
            "key" => Ok(Column::Key),
            "new_value" => Ok(Column::NewValue),
            "old_value" => Ok(Column::OldValue),
            "id" => Ok(Column::Id),
            "raw_id" => Ok(Column::RawId),
            "new_version" => Ok(Column::NewVersion),
            "old_version" => Ok(Column::OldVersion),
            "datetime" | "iso_datetime" | "iso_timestamp" => Ok(Column::IsoDatetime),
            "epoch" | "epoch_datetime" | "epoch_timestamp" => Ok(Column::EpochDatetime),
            "username" => Ok(Column::Username),
            "uid" => Ok(Column::Uid),
            "changeset_id" => Ok(Column::ChangesetId),
            col if col.starts_with("changeset.") => Ok(Column::ChangesetTag(
                col.strip_prefix("changeset.").unwrap().to_string(),
            )),
            "tag_count_delta" => Ok(Column::TagCountDelta),
            "object_type_short" => Ok(Column::ObjectTypeShort),
            "object_type_long" => Ok(Column::ObjectTypeLong),

            col => Err(anyhow::anyhow!("Unknown column value: {}", col)),
        }
    }
}

impl Column {
    fn is_changeset_tag(&self) -> bool {
        matches!(self, Column::ChangesetTag(_))
    }

    fn header(&self) -> Cow<str> {
        match self {
            Column::Key => "key".into(),
            Column::NewValue => "new_value".into(),
            Column::OldValue => "old_value".into(),
            Column::Id => "id".into(),
            Column::RawId => "raw_id".into(),
            Column::NewVersion => "new_version".into(),
            Column::OldVersion => "old_version".into(),
            Column::IsoDatetime => "iso_datetime".into(),
            Column::EpochDatetime => "epoch_datetime".into(),
            Column::Username => "username".into(),
            Column::Uid => "uid".into(),
            Column::ChangesetId => "changeset_id".into(),
            Column::ChangesetTag(t) => format!("changeset_{}", t).into(),
            Column::TagCountDelta => "tag_count_delta".into(),
            Column::ObjectTypeShort => "object_type_short".into(),
            Column::ObjectTypeLong => "object_type_long".into(),
        }
    }
}

fn main() -> Result<()> {
    let matches = App::new("osm-tag-csv-history")
        .version(env!("CARGO_PKG_VERSION"))
        .about("Create a CSV file detailing tagging changes in an OSM file")

        .arg(Arg::with_name("input")
             .short("i").long("input")
             .value_name("INPUT.osh.pbf")
             .help("Input file to convert.")
             .long_help("Read OSM data from this file. If it's a .osh.pbf history file, the full history will be output. Regular non-history files can be processed too")
             .takes_value(true).required(true)
             )

        .arg(Arg::with_name("output")
             .short("o").long("output")
             .value_name("OUTPUT.csv[.gz]")
             .help("Where to write the output. Use - for stdout. with auto compression (default), if this file ends with .gz, then it will be gzip compressed")
             .takes_value(true).required(true)
             )

        .arg(Arg::with_name("verbosity")
             .short("v").multiple(true)
             .help("Increase verbosity")
             )

        .arg(Arg::with_name("header")
             .long("header")
             .takes_value(false).required(false)
             .help("Include a CSV header (default)")
             .conflicts_with("no-header")
             )

        .arg(Arg::with_name("no-header")
             .long("no-header")
             .takes_value(false).required(false)
             .help("Do not include a CSV header")
             .conflicts_with("header")
             )

        .arg(Arg::with_name("compression")
             .short("c").long("compression")
             .takes_value(true).required(false)
             .possible_values(&["none", "auto", "gzip"])
             .hidden_short_help(true)
             .default_value("auto")
             .help("Should the output file be compressed?")
             .long_help("Should the CSV output be compress?\nnone = don't compress the output\ngzip = always compress output with gzip\nauto (default) = uncompressed unless the output filename ends in .gz")
             )

        .arg(Arg::with_name("log-frequency")
             .long("log-frequency")
             .value_name("SEC")
             .takes_value(true).required(false)
             .hidden_short_help(true)
             .default_value("10")
             .help("with -v, how often (in sec.) to print progress messages")
             )

        .arg(Arg::with_name("tag")
             .short("t").long("tag")
             .value_name("TAG")
             .takes_value(true).required(false)
             .multiple(true).number_of_values(1)
             .help("Only include changes to this tag (can be specified multiple times)")
             )


        .arg(Arg::with_name("changeset_filename")
             .long("changesets")
             .value_name("changesets-latest.osm.bz2")
             .takes_value(true).required(false)
             .help("Filename of the changeset file")
             )

        .arg(Arg::with_name("uid")
             .long("uid")
             .value_name("USERID")
             .takes_value(true).required(false)
             .multiple(true).number_of_values(1)
             .help("Only include changes made by this OSM user (by userid)")
             )


        .arg(Arg::with_name("output_format")
             .long("output-format")
             .takes_value(true).required(false)
             .help("output format")
             .possible_values(&["auto", "csv", "tsv"])
             .hidden_short_help(true)
             .default_value("auto")
             )

        .arg(Arg::with_name("columns")
             .short("C").long("columns")
             .takes_value(true).required(false)
             .default_value("key,new_value,old_value,id,new_version,old_version,datetime,username,uid,changeset_id")
             )

        .arg(Arg::with_name("object-types")
             .short("T").long("object-types")
             .value_name("nwr")
             .help("Only include these OSM Object types")
             .long_help("Only include these OSM Object types. Specify a letter for each type (n)ode/(w)way/(r)elation, e.g. -T wr = only ways & relations")
             .takes_value(true).required(false)
             .default_value("nwr")
             )


        .get_matches();

    env_logger::builder()
        .filter_level(match matches.occurrences_of("verbosity") {
            0 => log::LevelFilter::Warn,
            1 => log::LevelFilter::Info,
            2 => log::LevelFilter::Debug,
            _ => log::LevelFilter::Trace,
        })
        .init();

    let input_path = matches.value_of("input").unwrap();
    info!("Beginning processing of {}", input_path);

    let log_frequency: f32 = matches.value_of("log-frequency").unwrap().parse()?;

    let file =
        File::open(input_path).with_context(|| format!("opening input file {}", input_path))?;
    let mut osm_obj_reader =
        osmio::pbf::PBFReader::new(BufReader::new(ReaderWithSize::from_file(file)?));
    let mut objects_iter = osm_obj_reader.objects();

    let only_include_tags: Option<Vec<String>> = matches
        .values_of("tag")
        .map(|ts| ts.map(|s| s.to_string()).collect());

    let only_include_uids: Option<Vec<u32>> = match matches.values_of("uid") {
        None => None,
        Some(vals) => Some(vals.map(|u| Ok(u.parse()?)).collect::<Result<Vec<u32>>>()?),
    };

    let only_include_types = match matches.value_of("object-types") {
        None => (true, true, true),
        Some(object_types) => {
            let object_types = object_types.to_lowercase();
            (
                object_types.contains('n'),
                object_types.contains('w'),
                object_types.contains('r'),
            )
        }
    };

    let columns: Vec<Column> = matches
        .value_of("columns")
        .unwrap()
        .split(',')
        .map(|col_str| col_str.parse())
        .collect::<Result<Vec<Column>>>()?;
    debug!("columns: {:?}", columns);

    if let Some(only_include_tags) = only_include_tags.as_ref() {
        info!(
            "Only including changes to these {} tag(s): {:?}",
            only_include_tags.len(),
            only_include_tags
        );
    }

    if let Some(only_include_uids) = only_include_uids.as_ref() {
        info!(
            "Only including changes made by user id {:?}",
            only_include_uids
        );
    }

    // MUST be replaced with above columns
    // changesets?
    let changeset_lookup = if columns.iter().any(Column::is_changeset_tag) {
        let lookup =
            ChangesetTagLookup::from_filename(matches.value_of("changeset_filename").unwrap())?;
        debug!(
            "Reading changeset sqlite from {}",
            matches.value_of("changeset_filename").unwrap()
        );
        Some(lookup)
    } else {
        None
    };

    let include_header = match (
        matches.is_present("header"),
        matches.is_present("no-header"),
    ) {
        (false, false) => true,
        (true, false) => true,
        (false, true) => false,
        (true, true) => unreachable!(),
    };

    let output_format = match (
        matches.value_of("output_format"),
        matches.value_of("output"),
    ) {
        (Some("csv"), _) => OutputFormat::CSV,
        (Some("tsv"), _) => OutputFormat::TSV,
        (Some("auto"), Some("-")) => OutputFormat::CSV,
        (Some("auto"), Some(filename)) if filename.starts_with("/dev/fd/") => OutputFormat::CSV,
        (Some("auto"), Some(filename))
            if filename.ends_with(".csv") || filename.ends_with(".csv.gz") =>
        {
            OutputFormat::CSV
        }
        (Some("auto"), Some(filename))
            if filename.ends_with(".tsv") || filename.ends_with(".tsv.gz") =>
        {
            OutputFormat::TSV
        }
        (format, filename) => unreachable!(
            "Unable to determine output format: format={:?} filename={:?}",
            format, filename
        ),
    };

    let output_path = matches.value_of("output").unwrap();
    let output_writer: Box<dyn std::io::Write> = if output_path == "-" {
        Box::new(std::io::stdout())
    } else {
        Box::new(File::create(matches.value_of("output").unwrap())?)
    };
    let output_writer = match matches.value_of("compression") {
        Some("auto") => {
            if output_path == "-" || output_path.starts_with("/dev/fd/") {
                // stdout, so no compression
                trace!("Output is '-' or a FD, no compression");
                output_writer
            } else if output_path.ends_with(".csv.gz") || output_path.ends_with(".tsv.gz") {
                trace!("Output file ends with .[ct]sv.gz so using regular gzip");
                Box::new(GzEncoder::new(output_writer, Compression::default()))
            } else if output_path.ends_with(".csv") || output_path.ends_with(".tsv") {
                // uncompressed
                trace!("Output file ends with .[ct]sv so no compression");
                output_writer
            } else {
                bail!(
                    "Cannot auto-detect output compression format: {:?}",
                    output_path
                );
            }
        }
        Some("none") => output_writer,
        Some("gzip") => Box::new(GzEncoder::new(output_writer, Compression::default())),
        _ => unreachable!(),
    };
    let mut output = csv::WriterBuilder::new();
    match output_format {
        OutputFormat::CSV => {}
        OutputFormat::TSV => {
            output.delimiter(b'\t');
        }
    }
    let mut output = output.from_writer(output_writer);

    if include_header {
        trace!("Writing CSV header");
        for c in columns.iter() {
            output.write_field(c.header().as_ref())?;
        }

        output.write_record(None::<&[u8]>)?;
    }

    let mut curr = objects_iter.next().unwrap();
    let mut last: Option<osmio::obj_types::ArcOSMObj> = None;

    let mut num_objects = 0;

    let mut time_counter = do_every::DoEvery::new();

    let mut field_bytes = Vec::with_capacity(25);
    let mut utf8_bytes_buffer = vec![0; 4];
    let started_processing = Instant::now();
    let mut passes_uid_check;
    let mut passes_type_check;

    loop {
        // Logging output
        num_objects += 1;
        if num_objects % 1000 == 0 && time_counter.should_do_every_sec(log_frequency) {
            let reader = objects_iter.inner().inner().get_ref();
            info!(
                "Running: {:.3}% done ETA: {} est. total: {}",
                reader.fraction() * 100.,
                reader
                    .eta()
                    .map(|d| format_time(&d))
                    .unwrap_or_else(|| "N/A".to_string()),
                reader
                    .est_total_time()
                    .map(|d| format_time(&d))
                    .unwrap_or_else(|| "N/A".to_string()),
            );
            num_objects = 1;
        }

        passes_uid_check = if let (Some(this_uid), Some(only_include_uids)) =
            (curr.uid(), only_include_uids.as_ref())
        {
            // We have uid's & we're filtering based on uids
            only_include_uids.iter().any(|u| u == &this_uid)
        } else {
            true
        };

        passes_type_check = matches!(
            (curr.object_type(), only_include_types),
            (OSMObjectType::Node, (true, _, _))
                | (OSMObjectType::Way, (_, true, _))
                | (OSMObjectType::Relation, (_, _, true))
        );

        let has_tags = match last {
            None => curr.tagged(),
            Some(ref l) => l.tagged() || curr.tagged(),
        };
        let process_object = has_tags && passes_uid_check && passes_type_check;

        // The 'only_include_tags' could be checked here to speed it up

        if process_object {
            let (last_tags, last_version) = match last {
                None => (None, "".to_string()),
                Some(ref last) => {
                    ensure!(
                        sorted_objects(last, &curr) == Ordering::Less,
                        "Non sorted input"
                    );
                    if last.object_type() == curr.object_type() && last.id() == curr.id() {
                        (
                            Some(last.tags().collect::<HashMap<_, _>>()),
                            last.version().unwrap().to_string(),
                        )
                    } else {
                        (None, "".to_string())
                    }
                }
            };

            let curr_tags: BTreeMap<_, _> = curr.tags().collect();
            let mut keys: Vec<_> = curr_tags.keys().collect();
            if let Some(ref lt) = last_tags {
                keys.extend(lt.keys());
            }
            keys.sort();
            keys.dedup();

            let mut last_value: &str;
            let mut last_value_existed;
            let mut curr_value: &str;
            let mut curr_value_exists;

            for key in keys.into_iter() {
                // Should we skip this tag?
                if only_include_tags
                    .as_ref()
                    .map_or(false, |only_include_tags| {
                        !only_include_tags.iter().any(|t| t == key)
                    })
                {
                    continue;
                }
                if let Some(&value) = last_tags.as_ref().and_then(|lt| lt.get(key)) {
                    last_value = value;
                    last_value_existed = true;
                } else {
                    last_value = "";
                    last_value_existed = false;
                };

                if let Some(value) = curr_tags.get(key) {
                    curr_value = value;
                    curr_value_exists = true;
                } else {
                    curr_value = "";
                    curr_value_exists = false;
                };
                if last_value == curr_value {
                    continue;
                }

                trace!(
                    "Write tag change {} {:?} → {:?} ({}→{})",
                    key,
                    last_value,
                    curr_value,
                    last_value_existed,
                    curr_value_exists,
                );

                for column in columns.iter() {
                    field_bytes.clear();
                    match column {
                        Column::Key => {
                            encode_field(key, &mut field_bytes, &mut utf8_bytes_buffer);
                        }
                        Column::NewValue => {
                            encode_field(curr_value, &mut field_bytes, &mut utf8_bytes_buffer);
                        }
                        Column::OldValue => {
                            encode_field(last_value, &mut field_bytes, &mut utf8_bytes_buffer);
                        }
                        Column::Id => {
                            field_bytes.extend(
                                format!("{:?}{}", curr.object_type(), curr.id())
                                    .as_str()
                                    .bytes(),
                            );
                        }
                        Column::RawId => field_bytes.extend(curr.id().to_string().as_str().bytes()),
                        Column::NewVersion => {
                            field_bytes.extend(curr.version().unwrap().to_string().bytes());
                        }
                        Column::OldVersion => {
                            field_bytes.extend(last_version.as_str().bytes());
                        }
                        Column::IsoDatetime => {
                            field_bytes
                                .extend(curr.timestamp().as_ref().unwrap().to_iso_string().bytes());
                        }
                        Column::EpochDatetime => {
                            field_bytes.extend(
                                curr.timestamp()
                                    .as_ref()
                                    .unwrap()
                                    .to_epoch_number()
                                    .to_string()
                                    .bytes(),
                            );
                        }
                        Column::Username => {
                            encode_field(
                                curr.user().unwrap(),
                                &mut field_bytes,
                                &mut utf8_bytes_buffer,
                            );
                        }
                        Column::Uid => {
                            field_bytes.extend(curr.uid().unwrap().to_string().bytes());
                        }
                        Column::ChangesetId => {
                            field_bytes.extend(curr.changeset_id().unwrap().to_string().bytes());
                        }
                        Column::ObjectTypeShort => {
                            field_bytes.extend(match curr.object_type() {
                                OSMObjectType::Node => b"n",
                                OSMObjectType::Way => b"w",
                                OSMObjectType::Relation => b"r",
                            });
                        }
                        Column::ObjectTypeLong => {
                            field_bytes.extend(match curr.object_type() {
                                OSMObjectType::Node => b"node".iter(),
                                OSMObjectType::Way => b"way".iter(),
                                OSMObjectType::Relation => b"relation".iter(),
                            });
                        }
                        Column::ChangesetTag(changeset_tag) => {
                            match changeset_lookup
                                .as_ref()
                                .unwrap()
                                .tags(curr.changeset_id().unwrap())?
                            {
                                None => {
                                    trace!("No tags found for changeset {:?}", curr.changeset_id());
                                }
                                Some(tags_for_changeset) => {
                                    if let Some(v) =
                                        tags_for_changeset
                                            .iter()
                                            .filter_map(|(k, v)| {
                                                if k == changeset_tag {
                                                    Some(v)
                                                } else {
                                                    None
                                                }
                                            })
                                            .next()
                                    {
                                        field_bytes.extend(v.bytes());
                                    }
                                }
                            }
                        }
                        Column::TagCountDelta => {
                            field_bytes.extend(match (last_value_existed, curr_value_exists) {
                                (false, false) => unreachable!(),
                                (false, true) => b"+1".iter(),
                                (true, false) => b"-1".iter(),
                                (true, true) => b"0".iter(),
                            });
                        }
                    }
                    output.write_field(&field_bytes)?;
                }

                output.write_record(None::<&[u8]>)?;
            }
        }

        last = Some(curr);
        curr = match objects_iter.next() {
            None => {
                break;
            }
            Some(o) => o,
        };
    }

    info!(
        "Finished in {}",
        format_time(&(Instant::now() - started_processing))
    );
    Ok(())
}

fn encode_field(field: &str, bytes: &mut Vec<u8>, utf8_bytes_buffer: &mut Vec<u8>) {
    bytes.clear();

    for c in field.chars() {
        if c == '\t' {
            bytes.push(b'\\');
            bytes.push(b't');
        } else if c == '\n' {
            bytes.push(b'\\');
            bytes.push(b'n');
        } else {
            c.encode_utf8(utf8_bytes_buffer);
            bytes.extend(&utf8_bytes_buffer[..c.len_utf8()]);
        }
    }
}

fn sorted_objects(a: &impl OSMObj, b: &impl OSMObj) -> std::cmp::Ordering {
    a.object_type()
        .cmp(&b.object_type())
        .then(a.id().cmp(&b.id()))
        .then(a.version().cmp(&b.version()))
}

pub fn format_time(duration: &std::time::Duration) -> String {
    let sec = duration.as_secs_f32().round() as u64;
    if sec < 60 {
        format!("{:2}s", sec)
    } else {
        let (min, sec) = (sec / 60, sec % 60);
        if min < 60 {
            format!("{:2}m{:02}s", min, sec)
        } else {
            let (hr, min) = (min / 60, min % 60);
            if hr < 24 {
                format!("{}h{:02}m{:02}s", hr, min, sec)
            } else {
                let (day, hr) = (hr / 24, hr % 24);
                format!("{}d{}h{:02}m{:02}s", day, hr, min, sec)
            }
        }
    }
}

struct ChangesetTagLookup {
    conn: Connection,
}

impl ChangesetTagLookup {
    fn from_filename(filename: &str) -> Result<Self> {
        let conn = Connection::open(filename)?;
        Ok(ChangesetTagLookup { conn })
    }

    fn tags(&self, cid: u32) -> Result<Option<Vec<(String, String)>>> {
        let res: Option<Vec<u8>> = self
            .conn
            .query_row(
                "select other_tags from changeset_tags where id = ?1;",
                [cid],
                |row| row.get(0),
            )
            .optional()?;
        match res {
            None => Ok(None),
            Some(tags) => {
                let tags: Vec<(String, String)> = serde_json::from_slice(&tags)?;
                Ok(Some(tags))
            }
        }
    }
}