Skip to main content

soroban_cli/commands/
events.rs

1use clap::Parser;
2use indexmap::IndexMap;
3use soroban_spec_tools::event::DecodedEvent;
4use soroban_spec_tools::{sanitize, Spec};
5use std::collections::HashMap;
6use std::io;
7
8use crate::xdr::{self, Limits, ReadXdr, ScVal};
9use crate::{
10    config::{self, locator, network},
11    get_spec::get_remote_contract_spec,
12    rpc,
13};
14
15#[derive(Parser, Debug, Clone)]
16#[group(skip)]
17pub struct Cmd {
18    #[allow(clippy::doc_markdown)]
19    /// The first ledger sequence number in the range to pull events
20    /// https://developers.stellar.org/docs/learn/encyclopedia/network-configuration/ledger-headers#ledger-sequence
21    #[arg(long, conflicts_with = "cursor", required_unless_present = "cursor")]
22    start_ledger: Option<u32>,
23
24    /// The cursor corresponding to the start of the event range.
25    #[arg(
26        long,
27        conflicts_with = "start_ledger",
28        required_unless_present = "start_ledger"
29    )]
30    cursor: Option<String>,
31
32    /// Output formatting options for event stream
33    #[arg(long, value_enum, default_value = "pretty")]
34    output: OutputFormat,
35
36    /// The maximum number of events to display (defer to the server-defined limit).
37    #[arg(short, long, default_value = "10")]
38    count: usize,
39
40    /// A set of (up to 5) contract IDs to filter events on. This parameter can
41    /// be passed multiple times, e.g. `--id C123.. --id C456..`, or passed with
42    /// multiple parameters, e.g. `--id C123 C456`.
43    ///
44    /// Though the specification supports multiple filter objects (i.e.
45    /// combinations of type, IDs, and topics), only one set can be specified on
46    /// the command-line today, though that set can have multiple IDs/topics.
47    #[arg(
48        long = "id",
49        num_args = 1..=6,
50        help_heading = "FILTERS"
51    )]
52    contract_ids: Vec<config::UnresolvedContract>,
53
54    /// A set of (up to 5) topic filters to filter event topics on. A single
55    /// topic filter can contain 1-4 different segments, separated by
56    /// commas. An asterisk (`*` character) indicates a wildcard segment.
57    ///
58    /// In addition to up to 4 possible topic filter segments, the "**" wildcard can also be added, and will allow for a flexible number of topics in the returned events. The "**" wildcard must be the last segment in a query.
59    ///
60    /// If the "**" wildcard is not included, only events with the exact number of topics as the given filter will be returned.
61    ///
62    /// **Example:** topic filter with two segments: `--topic "AAAABQAAAAdDT1VOVEVSAA==,*"`
63    ///
64    /// **Example:** two topic filters with one and two segments each: `--topic "AAAABQAAAAdDT1VOVEVSAA==" --topic '*,*'`
65    ///
66    /// **Example:** topic filter with four segments and the "**" wildcard: --topic "AAAABQAAAAdDT1VOVEVSAA==,*,*,*,**"
67    ///
68    /// Note that all of these topic filters are combined with the contract IDs
69    /// into a single filter (i.e. combination of type, IDs, and topics).
70    #[arg(
71        long = "topic",
72        num_args = 1.., // allowing 1+ arguments here, and doing additional validation in parse_topics
73        help_heading = "FILTERS"
74    )]
75    topic_filters: Vec<String>,
76
77    /// Specifies which type of contract events to display.
78    #[arg(
79        long = "type",
80        value_enum,
81        default_value = "all",
82        help_heading = "FILTERS"
83    )]
84    event_type: rpc::EventType,
85
86    #[command(flatten)]
87    locator: locator::Args,
88
89    #[command(flatten)]
90    network: network::Args,
91}
92
93#[derive(thiserror::Error, Debug)]
94pub enum Error {
95    #[error("cursor is not valid")]
96    InvalidCursor,
97    #[error("filepath does not exist: {path}")]
98    InvalidFile { path: String },
99    #[error("filepath ({path}) cannot be read: {error}")]
100    CannotReadFile { path: String, error: String },
101    #[error("max of 5 topic filters allowed per request, received {filter_count}")]
102    MaxTopicFilters { filter_count: usize },
103    #[error("cannot parse topic filter {topic} into 1-4 segments")]
104    InvalidTopicFilter { topic: String },
105    #[error("invalid segment ({segment}) in topic filter ({topic}): {error}")]
106    InvalidSegment {
107        topic: String,
108        segment: String,
109        error: xdr::Error,
110    },
111    #[error("cannot parse contract ID {contract_id}: {error}")]
112    InvalidContractId {
113        contract_id: String,
114        error: stellar_strkey::DecodeError,
115    },
116    #[error("invalid JSON string: {error} ({debug})")]
117    InvalidJson {
118        debug: String,
119        error: serde_json::Error,
120    },
121    #[error("invalid timestamp in event: {ts}")]
122    InvalidTimestamp { ts: String },
123    #[error("missing start_ledger and cursor")]
124    MissingStartLedgerAndCursor,
125    #[error("missing target")]
126    MissingTarget,
127    #[error(transparent)]
128    Rpc(#[from] rpc::Error),
129    #[error(transparent)]
130    Generic(#[from] Box<dyn std::error::Error>),
131    #[error(transparent)]
132    Io(#[from] io::Error),
133    #[error(transparent)]
134    Xdr(#[from] xdr::Error),
135    #[error(transparent)]
136    Serde(#[from] serde_json::Error),
137    #[error(transparent)]
138    Network(#[from] network::Error),
139    #[error(transparent)]
140    Locator(#[from] locator::Error),
141    #[error(transparent)]
142    Config(#[from] config::Error),
143    #[error(transparent)]
144    GetSpec(#[from] crate::get_spec::Error),
145}
146
147#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, clap::ValueEnum)]
148pub enum OutputFormat {
149    /// Human-readable output with decoded event names and parameters
150    Pretty,
151
152    /// Human-readable output without colors
153    Plain,
154
155    /// JSON output with decoded event names and parameters
156    Json,
157
158    /// Raw event output without self-describing decoding
159    Raw,
160}
161
162/// Cache for contract specs, keyed by contract ID
163type SpecCache = HashMap<String, Option<Spec>>;
164
165/// Decoded event with metadata for JSON output.
166///
167/// This is intentionally a different schema from the raw `rpc::Event` format,
168/// focused on providing decoded event data with named parameters. Key differences:
169/// - `event_name`: The decoded event name from the contract spec (e.g., "Transfer")
170/// - `params`: Named parameters decoded using the contract spec
171///
172/// For the raw event format with all original fields (topics, value as base64 XDR),
173/// use `--output raw`.
174#[derive(serde::Serialize, Debug)]
175struct DecodedEventWithMetadata {
176    id: String,
177    ledger: u32,
178    ledger_closed_at: String,
179    #[serde(rename = "type")]
180    event_type: String,
181    contract_id: String,
182    event_name: String,
183    prefix_topics: Vec<String>,
184    params: IndexMap<String, serde_json::Value>,
185}
186
187impl Cmd {
188    pub async fn run(&mut self) -> Result<(), Error> {
189        let config = config::Args {
190            locator: self.locator.clone(),
191            network: self.network.clone(),
192            source_account: config::UnresolvedMuxedAccount::default(),
193            sign_with: config::sign_with::Args::default(),
194            fee: None,
195            inclusion_fee: None,
196        };
197        let response = self.execute(&config).await?;
198
199        if response.events.is_empty() {
200            eprintln!("No events");
201            return Ok(());
202        }
203
204        // Build spec cache for decoded output formats (not raw)
205        let spec_cache = if self.output == OutputFormat::Raw {
206            HashMap::new()
207        } else {
208            self.build_spec_cache(&response.events, &config).await
209        };
210
211        for event in &response.events {
212            let decoded = if self.output == OutputFormat::Raw {
213                None
214            } else {
215                Self::try_decode_event(event, &spec_cache)
216            };
217
218            match self.output {
219                OutputFormat::Pretty => {
220                    if let Some(decoded) = decoded {
221                        Self::print_decoded_event(&decoded, event, true)?;
222                    } else {
223                        event.pretty_print()?;
224                    }
225                }
226                OutputFormat::Plain => {
227                    if let Some(decoded) = decoded {
228                        Self::print_decoded_event(&decoded, event, false)?;
229                    } else {
230                        println!("{event}");
231                    }
232                }
233                OutputFormat::Json => {
234                    // Single-line JSON (NDJSON) for streaming processing
235                    if let Some(decoded) = decoded {
236                        let with_metadata = DecodedEventWithMetadata {
237                            id: event.id.clone(),
238                            ledger: event.ledger,
239                            ledger_closed_at: event.ledger_closed_at.clone(),
240                            event_type: event.event_type.clone(),
241                            contract_id: decoded.contract_id.clone(),
242                            event_name: decoded.event_name.clone(),
243                            prefix_topics: decoded.prefix_topics.clone(),
244                            params: decoded.params.clone(),
245                        };
246                        println!(
247                            "{}",
248                            serde_json::to_string(&with_metadata).map_err(|e| {
249                                Error::InvalidJson {
250                                    debug: format!("{with_metadata:#?}"),
251                                    error: e,
252                                }
253                            })?
254                        );
255                    } else {
256                        println!(
257                            "{}",
258                            serde_json::to_string(&event).map_err(|e| {
259                                Error::InvalidJson {
260                                    debug: format!("{event:#?}"),
261                                    error: e,
262                                }
263                            })?
264                        );
265                    }
266                }
267                OutputFormat::Raw => {
268                    event.pretty_print()?;
269                }
270            }
271        }
272        Ok(())
273    }
274
275    /// Build a cache of contract specs for the unique contract IDs in the events
276    async fn build_spec_cache(&self, events: &[rpc::Event], config: &config::Args) -> SpecCache {
277        // Collect unique contract IDs
278        let unique_ids: Vec<_> = events
279            .iter()
280            .map(|e| e.contract_id.clone())
281            .collect::<std::collections::HashSet<_>>()
282            .into_iter()
283            .collect();
284
285        // Fetch specs concurrently
286        let fetch_futures: Vec<_> = unique_ids
287            .iter()
288            .map(|id| Self::fetch_spec_for_contract(id, config))
289            .collect();
290
291        let results = futures::future::join_all(fetch_futures).await;
292
293        unique_ids.into_iter().zip(results).collect()
294    }
295
296    /// Fetch the spec for a single contract, returning None on failure
297    async fn fetch_spec_for_contract(contract_id_str: &str, config: &config::Args) -> Option<Spec> {
298        // Parse contract ID from string
299        let contract_id = match stellar_strkey::Contract::from_string(contract_id_str) {
300            Ok(id) => id,
301            Err(e) => {
302                tracing::debug!("Failed to parse contract ID {contract_id_str}: {e}");
303                return None;
304            }
305        };
306
307        match get_remote_contract_spec(
308            &contract_id.0,
309            &config.locator,
310            &config.network,
311            None,
312            Some(config),
313        )
314        .await
315        {
316            Ok(spec_entries) => Some(Spec::new(&spec_entries)),
317            Err(e) => {
318                tracing::debug!(
319                    "Failed to fetch spec for contract {contract_id_str}: {e}. Events from this contract will use raw format."
320                );
321                None
322            }
323        }
324    }
325
326    /// Try to decode an event using the spec cache
327    fn try_decode_event(event: &rpc::Event, spec_cache: &SpecCache) -> Option<DecodedEvent> {
328        let spec = spec_cache.get(&event.contract_id)?.as_ref()?;
329
330        // Decode topics from base64 XDR
331        let topics: Vec<ScVal> = event
332            .topic
333            .iter()
334            .filter_map(|t| ScVal::from_xdr_base64(t, Limits::none()).ok())
335            .collect();
336
337        if topics.len() != event.topic.len() {
338            return None; // Failed to decode some topics
339        }
340
341        // Decode value from base64 XDR
342        let data = ScVal::from_xdr_base64(&event.value, Limits::none()).ok()?;
343
344        spec.decode_event(&event.contract_id, &topics, &data)
345            .inspect_err(|e| tracing::debug!("Failed to decode event {}: {e}", event.id))
346            .ok()
347    }
348
349    /// Print a decoded event (with colors if use_colors is true, auto-detected for Pretty)
350    fn print_decoded_event(
351        decoded: &DecodedEvent,
352        event: &rpc::Event,
353        use_colors: bool,
354    ) -> Result<(), Error> {
355        use termcolor::{ColorChoice, StandardStream};
356
357        let color_choice = if use_colors {
358            ColorChoice::Auto
359        } else {
360            ColorChoice::Never
361        };
362        let mut stdout = StandardStream::stdout(color_choice);
363        Self::write_decoded_event(&mut stdout, decoded, event)
364    }
365
366    fn write_decoded_event<W: termcolor::WriteColor>(
367        stdout: &mut W,
368        decoded: &DecodedEvent,
369        event: &rpc::Event,
370    ) -> Result<(), Error> {
371        use termcolor::{Color, ColorSpec};
372
373        // Event header
374        stdout.set_color(ColorSpec::new().set_fg(Some(Color::Cyan)).set_bold(true))?;
375        write!(stdout, "Event")?;
376        stdout.reset()?;
377        writeln!(
378            stdout,
379            " {} [{}]:",
380            event.id,
381            event.event_type.to_uppercase()
382        )?;
383
384        // Ledger info
385        stdout.set_color(ColorSpec::new().set_fg(Some(Color::White)).set_dimmed(true))?;
386        write!(stdout, "  Ledger:   ")?;
387        stdout.reset()?;
388        writeln!(
389            stdout,
390            "{} (closed at {})",
391            event.ledger, event.ledger_closed_at
392        )?;
393
394        // Contract
395        stdout.set_color(ColorSpec::new().set_fg(Some(Color::White)).set_dimmed(true))?;
396        write!(stdout, "  Contract: ")?;
397        stdout.reset()?;
398        writeln!(stdout, "{}", decoded.contract_id)?;
399
400        // Event name with prefix topics
401        stdout.set_color(ColorSpec::new().set_fg(Some(Color::White)).set_dimmed(true))?;
402        write!(stdout, "  Event:    ")?;
403        stdout.reset()?;
404        stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green)).set_bold(true))?;
405        write!(stdout, "{}", sanitize(&decoded.event_name))?;
406        stdout.reset()?;
407        if !decoded.prefix_topics.is_empty() {
408            let prefix = decoded
409                .prefix_topics
410                .iter()
411                .map(|t| sanitize(t))
412                .collect::<Vec<_>>()
413                .join(", ");
414            write!(stdout, " ({prefix})")?;
415        }
416        writeln!(stdout)?;
417
418        // Params
419        if !decoded.params.is_empty() {
420            stdout.set_color(ColorSpec::new().set_fg(Some(Color::White)).set_dimmed(true))?;
421            writeln!(stdout, "  Params:")?;
422            stdout.reset()?;
423            for (name, value) in &decoded.params {
424                stdout.set_color(ColorSpec::new().set_fg(Some(Color::Yellow)))?;
425                write!(stdout, "    {}", sanitize(name))?;
426                stdout.reset()?;
427                write!(stdout, ": ")?;
428                stdout.set_color(ColorSpec::new().set_fg(Some(Color::White)))?;
429                writeln!(stdout, "{value}")?;
430                stdout.reset()?;
431            }
432        }
433
434        writeln!(stdout)?;
435        Ok(())
436    }
437
438    pub async fn execute(&self, config: &config::Args) -> Result<rpc::GetEventsResponse, Error> {
439        let start = self.start()?;
440        let network = config.get_network()?;
441        let client = network.rpc_client()?;
442        client
443            .verify_network_passphrase(Some(&network.network_passphrase))
444            .await?;
445
446        let contract_ids: Vec<String> = self
447            .contract_ids
448            .iter()
449            .map(|id| {
450                Ok(format!(
451                    "{}",
452                    id.resolve_contract_id(&self.locator, &network.network_passphrase)?
453                ))
454            })
455            .collect::<Result<Vec<_>, Error>>()?;
456
457        let parsed_topics = self.parse_topics()?;
458
459        client
460            .get_events(
461                start,
462                Some(self.event_type),
463                &contract_ids,
464                &parsed_topics,
465                Some(self.count),
466            )
467            .await
468            .map_err(Error::Rpc)
469    }
470
471    fn parse_topics(&self) -> Result<Vec<rpc::TopicFilter>, Error> {
472        if self.topic_filters.len() > 5 {
473            return Err(Error::MaxTopicFilters {
474                filter_count: self.topic_filters.len(),
475            });
476        }
477        let mut topic_filters: Vec<rpc::TopicFilter> = Vec::new();
478        for topic in &self.topic_filters {
479            let mut topic_filter: rpc::TopicFilter = Vec::new(); // a topic filter is a collection of segments
480            for (i, segment) in topic.split(',').enumerate() {
481                if i > 4 {
482                    return Err(Error::InvalidTopicFilter {
483                        topic: topic.clone(),
484                    });
485                }
486
487                if segment == "*" || segment == "**" {
488                    topic_filter.push(segment.to_owned());
489                } else {
490                    match xdr::ScVal::from_xdr_base64(segment, Limits::none()) {
491                        Ok(_s) => {
492                            topic_filter.push(segment.to_owned());
493                        }
494                        Err(e) => {
495                            return Err(Error::InvalidSegment {
496                                topic: topic.clone(),
497                                segment: segment.to_string(),
498                                error: e,
499                            });
500                        }
501                    }
502                }
503            }
504            topic_filters.push(topic_filter);
505        }
506
507        Ok(topic_filters)
508    }
509
510    fn start(&self) -> Result<rpc::EventStart, Error> {
511        let start = match (self.start_ledger, self.cursor.clone()) {
512            (Some(start), _) => rpc::EventStart::Ledger(start),
513            (_, Some(c)) => rpc::EventStart::Cursor(c),
514            // should never happen because of required_unless_present flags
515            _ => return Err(Error::MissingStartLedgerAndCursor),
516        };
517        Ok(start)
518    }
519}
520
521#[cfg(test)]
522mod tests {
523    use super::*;
524    use serde_json::json;
525    use soroban_spec_tools::test_utils::assert_no_control_chars;
526    use termcolor::Buffer;
527
528    fn evil_event() -> rpc::Event {
529        rpc::Event {
530            event_type: "contract".into(),
531            ledger: 1,
532            ledger_closed_at: "2026-01-01T00:00:00Z".into(),
533            contract_id: "CACA".into(),
534            id: "0000000001-0000000001".into(),
535            operation_index: None,
536            transaction_index: None,
537            tx_hash: None,
538            #[allow(deprecated)]
539            is_successful_contract_call: None,
540            topic: vec![],
541            value: String::new(),
542        }
543    }
544
545    fn evil_decoded() -> DecodedEvent {
546        let mut params = IndexMap::new();
547        params.insert("amount\x1b[31m".to_string(), json!(1000));
548        DecodedEvent {
549            contract_id: "CACA".to_string(),
550            event_name: "\x1b[2J\x1b[Htransfer".to_string(),
551            prefix_topics: vec!["\x1b[31mEVIL".into(), "topic2".into()],
552            params,
553        }
554    }
555
556    #[test]
557    fn write_decoded_event_strips_attacker_control_bytes() {
558        let mut buf = Buffer::no_color();
559        Cmd::write_decoded_event(&mut buf, &evil_decoded(), &evil_event()).unwrap();
560        let output = String::from_utf8(buf.into_inner()).unwrap();
561        assert_no_control_chars(&output);
562    }
563}