soroban-cli 0.4.0

Soroban CLI
use chrono::{DateTime, NaiveDateTime, Utc};
use clap::Parser;
use std::{fs, io, path};
use termcolor::{Color, ColorChoice, StandardStream, WriteColor};
use termcolor_output::colored;

use soroban_env_host::{
    events::{self},
    xdr::{self, ReadXdr, WriteXdr},
};

use crate::{rpc, toid, utils};
use crate::{HEADING_RPC, HEADING_SANDBOX};

#[derive(Parser, Debug)]
#[clap()]
pub struct Cmd {
    /// The first ledger sequence number in the range to pull events (required
    /// if not in sandbox mode).
    #[clap(short, long, default_value = "0")]
    start_ledger: u32,

    /// The last (and inclusive) ledger sequence number in the range to pull
    /// events (required if not in sandbox mode).
    /// https://developers.stellar.org/docs/encyclopedia/ledger-headers#ledger-sequence
    #[clap(short, long, default_value = "0")]
    end_ledger: u32,

    /// Output formatting options for event stream
    #[clap(long, arg_enum, default_value = "pretty")]
    output: OutputFormat,

    /// The maximum number of events to display (specify "0" to show all events
    /// when using sandbox, or to defer to the server-defined limit if using
    /// RPC).
    #[clap(short, long, default_value = "10")]
    count: usize,

    /// RPC server endpoint
    #[clap(long,
        env = "SOROBAN_RPC_URL",
        help_heading = HEADING_RPC,
        conflicts_with = "events-file",
    )]
    rpc_url: Option<String>,

    /// Local event store (likely generated by `invoke`) to pull events from
    #[clap(
        long,
        parse(from_os_str),
        value_name = "PATH",
        env = "SOROBAN_EVENTS_FILE",
        help_heading = HEADING_SANDBOX,
        conflicts_with = "rpc-url",
    )]
    events_file: Option<std::path::PathBuf>,

    /// A set of (up to 5) contract IDs to filter events on. This parameter can
    /// be passed multiple times, e.g. `--id abc --id def`, or passed with
    /// multiple parameters, e.g. `--id abd def`.
    ///
    /// Though the specification supports multiple filter objects (i.e.
    /// combinations of type, IDs, and topics), only one set can be specified on
    /// the command-line today, though that set can have multiple IDs/topics.
    #[clap(long = "id", multiple = true, max_values(5), help_heading = "FILTERS")]
    contract_ids: Vec<String>,

    /// A set of (up to 4) topic filters to filter event topics on. A single
    /// topic filter can contain 1-4 different segment filters, separated by
    /// commas, with an asterisk (* character) indicating a wildcard segment.
    ///
    /// For example, this is one topic filter with two segments:
    ///
    ///     --topic "AAAABQAAAAdDT1VOVEVSAA==,*"
    ///
    /// This is two topic filters with one and two segments each:
    ///
    ///     --topic "AAAABQAAAAdDT1VOVEVSAA==" --topic '*,*'
    ///
    /// Note that all of these topic filters are combined with the contract IDs
    /// into a single filter (i.e. combination of type, IDs, and topics).
    #[clap(
        long = "topic",
        multiple = true,
        max_values(5),
        help_heading = "FILTERS"
    )]
    topic_filters: Vec<String>,

    /// Specifies which type of contract events to display.
    #[clap(
        long = "type",
        arg_enum,
        default_value = "all",
        help_heading = "FILTERS"
    )]
    event_type: rpc::EventType,
}

#[derive(thiserror::Error, Debug)]
pub enum Error {
    #[error("invalid ledger range: --start-ledger bigger than --end-ledger ({low} > {high})")]
    InvalidLedgerRange { low: u32, high: u32 },

    #[error("filepath does not exist: {path}")]
    InvalidFile { path: String },

    #[error("filepath ({path}) cannot be read: {error}")]
    CannotReadFile { path: String, error: String },

    #[error("cannot parse topic filter {topic} into 1-4 segments")]
    InvalidTopicFilter { topic: String },

    #[error("invalid segment ({segment}) in topic filter ({topic}): {error}")]
    InvalidSegment {
        topic: String,
        segment: String,
        error: xdr::Error,
    },

    #[error("cannot parse contract ID {contract_id}: {error}")]
    InvalidContractId {
        contract_id: String,
        error: hex::FromHexError,
    },

    #[error("invalid JSON string: {error} ({debug})")]
    InvalidJson {
        debug: String,
        error: serde_json::Error,
    },

    #[error("invalid timestamp in event: {ts}")]
    InvalidTimestamp { ts: String },

    #[error("you must specify either an RPC server or sandbox filepath(s)")]
    TargetRequired,

    #[error("ledger range (-s and -e) is required when specifying an RPC server")]
    LedgerRangeRequired,

    #[error(transparent)]
    Rpc(#[from] rpc::Error),

    #[error(transparent)]
    Generic(#[from] Box<dyn std::error::Error>),

    #[error(transparent)]
    Io(#[from] io::Error),

    #[error(transparent)]
    Xdr(#[from] xdr::Error),

    #[error(transparent)]
    Serde(#[from] serde_json::Error),
}

#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, clap::ArgEnum)]
pub enum OutputFormat {
    /// Colorful, human-oriented console output
    Pretty,
    /// Human-oriented console output without colors
    Plain,
    /// JSONified console output
    Json,
}

impl Cmd {
    pub async fn run(&self, _matches: &clap::ArgMatches) -> Result<(), Error> {
        if self.start_ledger > self.end_ledger {
            return Err(Error::InvalidLedgerRange {
                low: self.start_ledger,
                high: self.end_ledger,
            });
        }

        // Validate that topics are made up of segments.
        for topic in &self.topic_filters {
            for (i, segment) in topic.split(',').enumerate() {
                if i > 4 {
                    return Err(Error::InvalidTopicFilter {
                        topic: topic.to_string(),
                    });
                }

                if segment != "*" {
                    if let Err(e) = xdr::ScVal::from_xdr_base64(segment) {
                        return Err(Error::InvalidSegment {
                            topic: topic.to_string(),
                            segment: segment.to_string(),
                            error: e,
                        });
                    }
                }
            }
        }

        let events = match (self.rpc_url.as_ref(), self.events_file.as_ref()) {
            (Some(rpc_url), _) => self.run_against_rpc_server(rpc_url).await,
            (_, Some(path)) => self.run_in_sandbox(path),
            _ => Err(Error::TargetRequired),
        }?;

        for event in &events {
            match self.output {
                // Should we pretty-print the JSON like we're doing here or just
                // dump an event in raw JSON on each line? The latter is easier
                // to consume programmatically.
                OutputFormat::Json => {
                    println!(
                        "{}",
                        serde_json::to_string_pretty(&event).map_err(|e| {
                            Error::InvalidJson {
                                debug: format!("{event:#?}"),
                                error: e,
                            }
                        })?,
                    );
                }
                OutputFormat::Plain => print_event(event)?,
                OutputFormat::Pretty => pretty_print_event(event)?,
            }
        }

        Ok(())
    }

    async fn run_against_rpc_server(&self, rpc_url: &str) -> Result<Vec<rpc::Event>, Error> {
        if self.start_ledger == 0 && self.end_ledger == 0 {
            return Err(Error::LedgerRangeRequired);
        }

        for raw_contract_id in &self.contract_ids {
            // We parse the contract IDs to ensure they're the correct format,
            // but since we'll be passing them as-is to the RPC server anyway,
            // we disregard the return value.
            utils::id_from_str::<32>(raw_contract_id).map_err(|e| Error::InvalidContractId {
                contract_id: raw_contract_id.clone(),
                error: e,
            })?;
        }

        let client = rpc::Client::new(rpc_url);
        Ok(client
            .get_events(
                self.start_ledger,
                self.end_ledger,
                Some(self.event_type),
                &self.contract_ids,
                &self.topic_filters,
                Some(self.count),
            )
            .await?
            .unwrap_or_default())
    }

    fn run_in_sandbox(&self, path: &path::PathBuf) -> Result<Vec<rpc::Event>, Error> {
        if !path.exists() {
            return Err(Error::InvalidFile {
                path: path.to_str().unwrap().to_string(),
            });
        }

        let count: usize = if self.count == 0 {
            std::usize::MAX
        } else {
            self.count
        };

        // Read the JSON events from disk and find the ones that match the
        // contract ID filter(s) that were passed in.
        Ok(read(path)
            .map_err(|err| Error::CannotReadFile {
                path: path.to_str().unwrap().to_string(),
                error: err.to_string(),
            })?
            .iter()
            // FIXME: We assume here that events are read off-disk in
            // chronological order, but we should probably be sorting by ledger
            // number (and ID, for events within the same ledger), instead,
            // though it's likely that this logic belongs more in
            // `snapshot::read_events()`.
            .rev()
            .filter(|evt| {
                // The ledger range is optional in sandbox mode.
                if self.start_ledger == 0 && self.end_ledger == 0 {
                    return true;
                }

                match evt.ledger.parse::<u32>() {
                    Ok(seq) => seq >= self.start_ledger && seq <= self.end_ledger,
                    Err(e) => {
                        eprintln!("error parsing key 'ledger': {e:?}");
                        eprintln!(
                            "your sandbox events file ('{}') may be corrupt",
                            path.to_str().unwrap(),
                        );
                        eprintln!("ignoring this event: {evt:#?}");

                        false
                    }
                }
            })
            .filter(|evt| {
                // Contract ID filter(s) are optional, so we should render all
                // events if they're omitted.
                self.contract_ids.is_empty()
                    || self.contract_ids.iter().any(|id| *id == evt.contract_id)
            })
            .filter(|evt| {
                // Like before, no topic filters means pass everything through.
                self.topic_filters.is_empty() ||
                // Reminder: All of the topic filters are part of a single
                // filter object, and each one contains segments, so we need to
                // apply all of them to the given event.
                self.topic_filters
                    .iter()
                    // quadratic, but both are <= 5 long
                    .any(|f| {
                        does_topic_match(
                            &evt.topic,
                            // misc. Rust nonsense: make a copy over the given
                            // split filter, because passing a slice of
                            // references is too much for this language to
                            // handle
                            &f.split(',')
                            .map(std::string::ToString::to_string)
                            .collect::<Vec<String>>()
                        )
                    })
            })
            .take(count)
            .cloned()
            .collect::<Vec<rpc::Event>>())
    }
}

// Determines whether or not a particular filter matches a topic based on the
// same semantics as the RPC server:
//
//  - for an exact segment match, the filter is a base64-encoded ScVal
//  - for a wildcard, single-segment match, the string "*" matches exactly one
//    segment
//
// The expectation is that a `filter` is a comma-separated list of segments that
// has previously been validated, and `topic` is the list of segments applicable
// for this event.
//
// [API
// Reference](https://docs.google.com/document/d/1TZUDgo_3zPz7TiPMMHVW_mtogjLyPL0plvzGMsxSz6A/edit#bookmark=id.35t97rnag3tx)
// [Code
// Reference](https://github.com/stellar/soroban-tools/blob/bac1be79e8c2590c9c35ad8a0168aab0ae2b4171/cmd/soroban-rpc/internal/methods/get_events.go#L182-L203)
pub fn does_topic_match(topic: &[String], filter: &[String]) -> bool {
    let mut idx = 0;

    for segment in filter {
        if idx >= topic.len() {
            // Nothing to match, need at least one segment.
            return false;
        }

        if *segment == "*" {
            // One-segment wildcard: ignore this token
        } else if *segment != topic[idx] {
            // Exact match the ScVal (decodability is assumed)
            return false;
        }

        idx += 1;
    }

    // Check we had no leftovers
    idx >= topic.len()
}

pub fn print_event(event: &rpc::Event) -> Result<(), Box<dyn std::error::Error>> {
    println!(
        "Event {} [{}]:",
        event.paging_token,
        event.event_type.to_ascii_uppercase()
    );
    println!(
        "  Ledger:   {} (closed at {})",
        event.ledger, event.ledger_closed_at
    );
    println!("  Contract: {}", event.contract_id);
    println!("  Topics:");
    for topic in &event.topic {
        let scval = xdr::ScVal::from_xdr_base64(topic)?;
        println!("            {scval:?}");
    }
    let scval = xdr::ScVal::from_xdr_base64(&event.value.xdr)?;
    println!("  Value:    {scval:?}");

    Ok(())
}

pub fn pretty_print_event(event: &rpc::Event) -> Result<(), Box<dyn std::error::Error>> {
    let mut stdout = StandardStream::stdout(ColorChoice::Auto);
    if !stdout.supports_color() {
        print_event(event)?;
        return Ok(());
    }

    let color = match event.event_type.as_str() {
        "system" => Color::Yellow,
        _ => Color::Blue,
    };
    colored!(
        stdout,
        "{}Event{} {}{}{} [{}{}{}{}]:\n",
        bold!(true),
        bold!(false),
        fg!(Some(Color::Green)),
        event.paging_token,
        reset!(),
        bold!(true),
        fg!(Some(color)),
        event.event_type.to_ascii_uppercase(),
        reset!(),
    )?;

    colored!(
        stdout,
        "  Ledger:   {}{}{} (closed at {}{}{})\n",
        fg!(Some(Color::Green)),
        event.ledger,
        reset!(),
        fg!(Some(Color::Green)),
        event.ledger_closed_at,
        reset!(),
    )?;

    colored!(
        stdout,
        "  Contract: {}0x{}{}\n",
        fg!(Some(Color::Green)),
        event.contract_id,
        reset!(),
    )?;

    colored!(stdout, "  Topics:\n")?;
    for topic in &event.topic {
        let scval = xdr::ScVal::from_xdr_base64(topic)?;
        colored!(
            stdout,
            "            {}{:?}{}\n",
            fg!(Some(Color::Green)),
            scval,
            reset!(),
        )?;
    }

    let scval = xdr::ScVal::from_xdr_base64(&event.value.xdr)?;
    colored!(
        stdout,
        "  Value: {}{:?}{}\n",
        fg!(Some(Color::Green)),
        scval,
        reset!(),
    )?;

    Ok(())
}

/// Returns a list of events from the on-disk event store, which stores events
/// exactly as they'd be returned by an RPC server.
pub fn read(path: &std::path::PathBuf) -> Result<Vec<rpc::Event>, Error> {
    let reader = std::fs::OpenOptions::new().read(true).open(path)?;
    Ok(serde_json::from_reader(reader)?)
}

/// Reads the existing event file, appends the new events, and writes it all to
/// disk. Note that this almost certainly isn't safe to call in parallel.
pub fn commit(
    new_events: &[events::HostEvent],
    ledger_info: &soroban_ledger_snapshot::LedgerSnapshot,
    output_file: &std::path::PathBuf,
) -> Result<(), Error> {
    // Create the directory tree if necessary, since these are unlikely to be
    // the first events.
    if let Some(dir) = output_file.parent() {
        if !dir.exists() {
            fs::create_dir_all(dir)?;
        }
    }

    let mut events: rpc::GetEventsResponse = if path::Path::exists(output_file) {
        let mut file = fs::OpenOptions::new().read(true).open(output_file)?;
        serde_json::from_reader(&mut file)?
    } else {
        vec![]
    };

    for (i, event) in new_events.iter().enumerate() {
        let contract_event = match event {
            events::HostEvent::Contract(e) => e,
            events::HostEvent::Debug(e) => {
                return Err(Error::Generic(
                    format!("debug events unsupported: {e:#?}").into(),
                ))
            }
        };

        let topics = match &contract_event.body {
            xdr::ContractEventBody::V0(e) => &e.topics,
        }
        .iter()
        .map(xdr::WriteXdr::to_xdr_base64)
        .collect::<Result<Vec<String>, _>>()?;

        // stolen from
        // https://github.com/stellar/soroban-tools/blob/main/cmd/soroban-rpc/internal/methods/get_events.go#L264
        let id = format!(
            "{}-{:010}",
            toid::Toid::new(
                ledger_info.sequence_number,
                // we should technically inject the tx order here from the
                // ledger info, but the sandbox does one tx/op per ledger
                // anyway, so this is a safe assumption
                1,
                1,
            )
            .to_paging_token(),
            i + 1
        );

        // Misc. timestamp to RFC 3339-formatted datetime nonsense, with an
        // absurd amount of verbosity because every edge case needs its own
        // chain of error-handling methods.
        //
        // Reference: https://stackoverflow.com/a/50072164
        let ts: i64 = ledger_info
            .timestamp
            .try_into()
            .map_err(|_e| Error::InvalidTimestamp {
                ts: ledger_info.timestamp.to_string(),
            })?;
        let ndt =
            NaiveDateTime::from_timestamp_opt(ts, 0).ok_or_else(|| Error::InvalidTimestamp {
                ts: ledger_info.timestamp.to_string(),
            })?;

        let dt: DateTime<Utc> = DateTime::from_utc(ndt, Utc);

        let cereal_event = rpc::Event {
            event_type: match contract_event.type_ {
                xdr::ContractEventType::Contract => "contract",
                xdr::ContractEventType::System => "system",
            }
            .to_string(),
            paging_token: id.clone(),
            id,
            ledger: ledger_info.sequence_number.to_string(),
            ledger_closed_at: dt.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
            contract_id: hex::encode(
                contract_event
                    .contract_id
                    .as_ref()
                    .unwrap_or(&xdr::Hash([0; 32])),
            ),
            topic: topics,
            value: rpc::EventValue {
                xdr: match &contract_event.body {
                    xdr::ContractEventBody::V0(e) => &e.data,
                }
                .to_xdr_base64()?,
            },
        };

        events.push(cereal_event);
    }

    let mut file = std::fs::OpenOptions::new()
        .create(true)
        .write(true)
        .truncate(true)
        .open(output_file)?;

    serde_json::to_writer_pretty(&mut file, &events)?;

    Ok(())
}

#[cfg(test)]
mod tests {
    use assert_fs::NamedTempFile;

    use super::*;

    #[test]
    // Taken from [RPC server
    // tests](https://github.com/stellar/soroban-tools/blob/main/cmd/soroban-rpc/internal/methods/get_events_test.go#L21).
    fn test_does_topic_match() {
        struct TestCase<'a> {
            name: &'a str,
            filter: Vec<&'a str>,
            includes: Vec<Vec<&'a str>>,
            excludes: Vec<Vec<&'a str>>,
        }

        let xfer = "AAAABQAAAAh0cmFuc2Zlcg==";
        let number = "AAAAAQB6Mcc=";
        let star = "*";

        for tc in vec![
            // No filter means match nothing.
            TestCase {
                name: "<empty>",
                filter: vec![],
                includes: vec![],
                excludes: vec![vec![xfer]],
            },
            // "*" should match "transfer/" but not "transfer/transfer" or
            // "transfer/amount", because * is specified as a SINGLE segment
            // wildcard.
            TestCase {
                name: "*",
                filter: vec![star],
                includes: vec![vec![xfer]],
                excludes: vec![vec![xfer, xfer], vec![xfer, number]],
            },
            // "*/transfer" should match anything preceding "transfer", but
            // nothing that isn't exactly two segments long.
            TestCase {
                name: "*/transfer",
                filter: vec![star, xfer],
                includes: vec![vec![number, xfer], vec![xfer, xfer]],
                excludes: vec![
                    vec![number],
                    vec![number, number],
                    vec![number, xfer, number],
                    vec![xfer],
                    vec![xfer, number],
                    vec![xfer, xfer, xfer],
                ],
            },
            // The inverse case of before: "transfer/*" should match any single
            // segment after a segment that is exactly "transfer", but no
            // additional segments.
            TestCase {
                name: "transfer/*",
                filter: vec![xfer, star],
                includes: vec![vec![xfer, number], vec![xfer, xfer]],
                excludes: vec![
                    vec![number],
                    vec![number, number],
                    vec![number, xfer, number],
                    vec![xfer],
                    vec![number, xfer],
                    vec![xfer, xfer, xfer],
                ],
            },
            // Here, we extend to exactly two wild segments after transfer.
            TestCase {
                name: "transfer/*/*",
                filter: vec![xfer, star, star],
                includes: vec![vec![xfer, number, number], vec![xfer, xfer, xfer]],
                excludes: vec![
                    vec![number],
                    vec![number, number],
                    vec![number, xfer],
                    vec![number, xfer, number, number],
                    vec![xfer],
                    vec![xfer, xfer, xfer, xfer],
                ],
            },
            // Here, we ensure wildcards can be in the middle of a filter: only
            // exact matches happen on the ends, while the middle can be
            // anything.
            TestCase {
                name: "transfer/*/number",
                filter: vec![xfer, star, number],
                includes: vec![vec![xfer, number, number], vec![xfer, xfer, number]],
                excludes: vec![
                    vec![number],
                    vec![number, number],
                    vec![number, number, number],
                    vec![number, xfer, number],
                    vec![xfer],
                    vec![number, xfer],
                    vec![xfer, xfer, xfer],
                    vec![xfer, number, xfer],
                ],
            },
        ] {
            for topic in tc.includes {
                assert!(
                    does_topic_match(
                        &topic
                            .iter()
                            .map(std::string::ToString::to_string)
                            .collect::<Vec<String>>(),
                        &tc.filter
                            .iter()
                            .map(std::string::ToString::to_string)
                            .collect::<Vec<String>>()
                    ),
                    "test: {}, topic ({:?}) should be matched by filter ({:?})",
                    tc.name,
                    topic,
                    tc.filter
                );
            }

            for topic in tc.excludes {
                assert!(
                    !does_topic_match(
                        // make deep copies of the vecs
                        &topic
                            .iter()
                            .map(std::string::ToString::to_string)
                            .collect::<Vec<String>>(),
                        &tc.filter
                            .iter()
                            .map(std::string::ToString::to_string)
                            .collect::<Vec<String>>()
                    ),
                    "test: {}, topic ({:?}) should NOT be matched by filter ({:?})",
                    tc.name,
                    topic,
                    tc.filter
                );
            }
        }
    }

    #[test]
    fn test_does_event_serialization_match() {
        let temp = NamedTempFile::new("events.json").unwrap();

        // Make a couple of fake events with slightly different properties and
        // write them to disk, then read the serialized versions from disk and
        // ensure the properties match.

        let events: Vec<events::HostEvent> = vec![
            events::HostEvent::Contract(xdr::ContractEvent {
                ext: xdr::ExtensionPoint::V0,
                contract_id: Some(xdr::Hash([0; 32])),
                type_: xdr::ContractEventType::Contract,
                body: xdr::ContractEventBody::V0(xdr::ContractEventV0 {
                    topics: xdr::ScVec(vec![].try_into().unwrap()),
                    data: xdr::ScVal::U32(12345),
                }),
            }),
            events::HostEvent::Contract(xdr::ContractEvent {
                ext: xdr::ExtensionPoint::V0,
                contract_id: Some(xdr::Hash([0x1; 32])),
                type_: xdr::ContractEventType::Contract,
                body: xdr::ContractEventBody::V0(xdr::ContractEventV0 {
                    topics: xdr::ScVec(vec![].try_into().unwrap()),
                    data: xdr::ScVal::I32(67890),
                }),
            }),
        ];

        let snapshot = soroban_ledger_snapshot::LedgerSnapshot {
            protocol_version: 1,
            sequence_number: 2, // this is the only value that matters
            timestamp: 3,
            network_passphrase: "four".into(),
            base_reserve: 5,
            ledger_entries: vec![],
        };

        commit(&events, &snapshot, &temp.to_path_buf()).unwrap();

        let events = read(&temp.to_path_buf()).unwrap();
        assert_eq!(events.len(), 2);
        assert_eq!(events[0].ledger, "2");
        assert_eq!(events[1].ledger, "2");
        assert_eq!(events[0].contract_id, "0".repeat(64));
        assert_eq!(events[1].contract_id, "01".repeat(32));
    }
}