Skip to main content

soroban_cli/commands/
events.rs

1use clap::Parser;
2use indexmap::IndexMap;
3use soroban_spec_tools::event::DecodedEvent;
4use soroban_spec_tools::Spec;
5use std::collections::HashMap;
6use std::io;
7
8use crate::xdr::{self, Limits, ReadXdr, ScVal};
9use crate::{
10    config::{self, locator, network},
11    get_spec::get_remote_contract_spec,
12    rpc,
13};
14
15#[derive(Parser, Debug, Clone)]
16#[group(skip)]
17pub struct Cmd {
18    #[allow(clippy::doc_markdown)]
19    /// The first ledger sequence number in the range to pull events
20    /// https://developers.stellar.org/docs/learn/encyclopedia/network-configuration/ledger-headers#ledger-sequence
21    #[arg(long, conflicts_with = "cursor", required_unless_present = "cursor")]
22    start_ledger: Option<u32>,
23
24    /// The cursor corresponding to the start of the event range.
25    #[arg(
26        long,
27        conflicts_with = "start_ledger",
28        required_unless_present = "start_ledger"
29    )]
30    cursor: Option<String>,
31
32    /// Output formatting options for event stream
33    #[arg(long, value_enum, default_value = "pretty")]
34    output: OutputFormat,
35
36    /// The maximum number of events to display (defer to the server-defined limit).
37    #[arg(short, long, default_value = "10")]
38    count: usize,
39
40    /// A set of (up to 5) contract IDs to filter events on. This parameter can
41    /// be passed multiple times, e.g. `--id C123.. --id C456..`, or passed with
42    /// multiple parameters, e.g. `--id C123 C456`.
43    ///
44    /// Though the specification supports multiple filter objects (i.e.
45    /// combinations of type, IDs, and topics), only one set can be specified on
46    /// the command-line today, though that set can have multiple IDs/topics.
47    #[arg(
48        long = "id",
49        num_args = 1..=6,
50        help_heading = "FILTERS"
51    )]
52    contract_ids: Vec<config::UnresolvedContract>,
53
54    /// A set of (up to 5) topic filters to filter event topics on. A single
55    /// topic filter can contain 1-4 different segments, separated by
56    /// commas. An asterisk (`*` character) indicates a wildcard segment.
57    ///
58    /// In addition to up to 4 possible topic filter segments, the "**" wildcard can also be added, and will allow for a flexible number of topics in the returned events. The "**" wildcard must be the last segment in a query.
59    ///
60    /// If the "**" wildcard is not included, only events with the exact number of topics as the given filter will be returned.
61    ///
62    /// **Example:** topic filter with two segments: `--topic "AAAABQAAAAdDT1VOVEVSAA==,*"`
63    ///
64    /// **Example:** two topic filters with one and two segments each: `--topic "AAAABQAAAAdDT1VOVEVSAA==" --topic '*,*'`
65    ///
66    /// **Example:** topic filter with four segments and the "**" wildcard: --topic "AAAABQAAAAdDT1VOVEVSAA==,*,*,*,**"
67    ///
68    /// Note that all of these topic filters are combined with the contract IDs
69    /// into a single filter (i.e. combination of type, IDs, and topics).
70    #[arg(
71        long = "topic",
72        num_args = 1.., // allowing 1+ arguments here, and doing additional validation in parse_topics
73        help_heading = "FILTERS"
74    )]
75    topic_filters: Vec<String>,
76
77    /// Specifies which type of contract events to display.
78    #[arg(
79        long = "type",
80        value_enum,
81        default_value = "all",
82        help_heading = "FILTERS"
83    )]
84    event_type: rpc::EventType,
85
86    #[command(flatten)]
87    locator: locator::Args,
88
89    #[command(flatten)]
90    network: network::Args,
91}
92
93#[derive(thiserror::Error, Debug)]
94pub enum Error {
95    #[error("cursor is not valid")]
96    InvalidCursor,
97    #[error("filepath does not exist: {path}")]
98    InvalidFile { path: String },
99    #[error("filepath ({path}) cannot be read: {error}")]
100    CannotReadFile { path: String, error: String },
101    #[error("max of 5 topic filters allowed per request, received {filter_count}")]
102    MaxTopicFilters { filter_count: usize },
103    #[error("cannot parse topic filter {topic} into 1-4 segments")]
104    InvalidTopicFilter { topic: String },
105    #[error("invalid segment ({segment}) in topic filter ({topic}): {error}")]
106    InvalidSegment {
107        topic: String,
108        segment: String,
109        error: xdr::Error,
110    },
111    #[error("cannot parse contract ID {contract_id}: {error}")]
112    InvalidContractId {
113        contract_id: String,
114        error: stellar_strkey::DecodeError,
115    },
116    #[error("invalid JSON string: {error} ({debug})")]
117    InvalidJson {
118        debug: String,
119        error: serde_json::Error,
120    },
121    #[error("invalid timestamp in event: {ts}")]
122    InvalidTimestamp { ts: String },
123    #[error("missing start_ledger and cursor")]
124    MissingStartLedgerAndCursor,
125    #[error("missing target")]
126    MissingTarget,
127    #[error(transparent)]
128    Rpc(#[from] rpc::Error),
129    #[error(transparent)]
130    Generic(#[from] Box<dyn std::error::Error>),
131    #[error(transparent)]
132    Io(#[from] io::Error),
133    #[error(transparent)]
134    Xdr(#[from] xdr::Error),
135    #[error(transparent)]
136    Serde(#[from] serde_json::Error),
137    #[error(transparent)]
138    Network(#[from] network::Error),
139    #[error(transparent)]
140    Locator(#[from] locator::Error),
141    #[error(transparent)]
142    Config(#[from] config::Error),
143    #[error(transparent)]
144    GetSpec(#[from] crate::get_spec::Error),
145}
146
147#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, clap::ValueEnum)]
148pub enum OutputFormat {
149    /// Human-readable output with decoded event names and parameters
150    Pretty,
151
152    /// Human-readable output without colors
153    Plain,
154
155    /// JSON output with decoded event names and parameters
156    Json,
157
158    /// Raw event output without self-describing decoding
159    Raw,
160}
161
162/// Cache for contract specs, keyed by contract ID
163type SpecCache = HashMap<String, Option<Spec>>;
164
165/// Decoded event with metadata for JSON output.
166///
167/// This is intentionally a different schema from the raw `rpc::Event` format,
168/// focused on providing decoded event data with named parameters. Key differences:
169/// - `event_name`: The decoded event name from the contract spec (e.g., "Transfer")
170/// - `params`: Named parameters decoded using the contract spec
171///
172/// For the raw event format with all original fields (topics, value as base64 XDR),
173/// use `--output raw`.
174#[derive(serde::Serialize, Debug)]
175struct DecodedEventWithMetadata {
176    id: String,
177    ledger: u32,
178    ledger_closed_at: String,
179    #[serde(rename = "type")]
180    event_type: String,
181    contract_id: String,
182    event_name: String,
183    prefix_topics: Vec<String>,
184    params: IndexMap<String, serde_json::Value>,
185}
186
187impl Cmd {
188    pub async fn run(&mut self) -> Result<(), Error> {
189        let config = config::Args {
190            locator: self.locator.clone(),
191            network: self.network.clone(),
192            source_account: config::UnresolvedMuxedAccount::default(),
193            sign_with: config::sign_with::Args::default(),
194            fee: None,
195            inclusion_fee: None,
196        };
197        let response = self.execute(&config).await?;
198
199        if response.events.is_empty() {
200            eprintln!("No events");
201            return Ok(());
202        }
203
204        // Build spec cache for decoded output formats (not raw)
205        let spec_cache = if self.output == OutputFormat::Raw {
206            HashMap::new()
207        } else {
208            self.build_spec_cache(&response.events, &config).await
209        };
210
211        for event in &response.events {
212            let decoded = if self.output == OutputFormat::Raw {
213                None
214            } else {
215                Self::try_decode_event(event, &spec_cache)
216            };
217
218            match self.output {
219                OutputFormat::Pretty => {
220                    if let Some(decoded) = decoded {
221                        Self::print_decoded_event(&decoded, event, true)?;
222                    } else {
223                        event.pretty_print()?;
224                    }
225                }
226                OutputFormat::Plain => {
227                    if let Some(decoded) = decoded {
228                        Self::print_decoded_event(&decoded, event, false)?;
229                    } else {
230                        println!("{event}");
231                    }
232                }
233                OutputFormat::Json => {
234                    // Single-line JSON (NDJSON) for streaming processing
235                    if let Some(decoded) = decoded {
236                        let with_metadata = DecodedEventWithMetadata {
237                            id: event.id.clone(),
238                            ledger: event.ledger,
239                            ledger_closed_at: event.ledger_closed_at.clone(),
240                            event_type: event.event_type.clone(),
241                            contract_id: decoded.contract_id.clone(),
242                            event_name: decoded.event_name.clone(),
243                            prefix_topics: decoded.prefix_topics.clone(),
244                            params: decoded.params.clone(),
245                        };
246                        println!(
247                            "{}",
248                            serde_json::to_string(&with_metadata).map_err(|e| {
249                                Error::InvalidJson {
250                                    debug: format!("{with_metadata:#?}"),
251                                    error: e,
252                                }
253                            })?
254                        );
255                    } else {
256                        println!(
257                            "{}",
258                            serde_json::to_string(&event).map_err(|e| {
259                                Error::InvalidJson {
260                                    debug: format!("{event:#?}"),
261                                    error: e,
262                                }
263                            })?
264                        );
265                    }
266                }
267                OutputFormat::Raw => {
268                    event.pretty_print()?;
269                }
270            }
271        }
272        Ok(())
273    }
274
275    /// Build a cache of contract specs for the unique contract IDs in the events
276    async fn build_spec_cache(&self, events: &[rpc::Event], config: &config::Args) -> SpecCache {
277        // Collect unique contract IDs
278        let unique_ids: Vec<_> = events
279            .iter()
280            .map(|e| e.contract_id.clone())
281            .collect::<std::collections::HashSet<_>>()
282            .into_iter()
283            .collect();
284
285        // Fetch specs concurrently
286        let fetch_futures: Vec<_> = unique_ids
287            .iter()
288            .map(|id| Self::fetch_spec_for_contract(id, config))
289            .collect();
290
291        let results = futures::future::join_all(fetch_futures).await;
292
293        unique_ids.into_iter().zip(results).collect()
294    }
295
296    /// Fetch the spec for a single contract, returning None on failure
297    async fn fetch_spec_for_contract(contract_id_str: &str, config: &config::Args) -> Option<Spec> {
298        // Parse contract ID from string
299        let contract_id = match stellar_strkey::Contract::from_string(contract_id_str) {
300            Ok(id) => id,
301            Err(e) => {
302                tracing::debug!("Failed to parse contract ID {contract_id_str}: {e}");
303                return None;
304            }
305        };
306
307        match get_remote_contract_spec(
308            &contract_id.0,
309            &config.locator,
310            &config.network,
311            None,
312            Some(config),
313        )
314        .await
315        {
316            Ok(spec_entries) => Some(Spec::new(&spec_entries)),
317            Err(e) => {
318                tracing::debug!(
319                    "Failed to fetch spec for contract {contract_id_str}: {e}. Events from this contract will use raw format."
320                );
321                None
322            }
323        }
324    }
325
326    /// Try to decode an event using the spec cache
327    fn try_decode_event(event: &rpc::Event, spec_cache: &SpecCache) -> Option<DecodedEvent> {
328        let spec = spec_cache.get(&event.contract_id)?.as_ref()?;
329
330        // Decode topics from base64 XDR
331        let topics: Vec<ScVal> = event
332            .topic
333            .iter()
334            .filter_map(|t| ScVal::from_xdr_base64(t, Limits::none()).ok())
335            .collect();
336
337        if topics.len() != event.topic.len() {
338            return None; // Failed to decode some topics
339        }
340
341        // Decode value from base64 XDR
342        let data = ScVal::from_xdr_base64(&event.value, Limits::none()).ok()?;
343
344        spec.decode_event(&event.contract_id, &topics, &data)
345            .inspect_err(|e| tracing::debug!("Failed to decode event {}: {e}", event.id))
346            .ok()
347    }
348
349    /// Print a decoded event (with colors if use_colors is true, auto-detected for Pretty)
350    fn print_decoded_event(
351        decoded: &DecodedEvent,
352        event: &rpc::Event,
353        use_colors: bool,
354    ) -> Result<(), Error> {
355        use std::io::Write;
356        use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
357
358        let color_choice = if use_colors {
359            ColorChoice::Auto
360        } else {
361            ColorChoice::Never
362        };
363        let mut stdout = StandardStream::stdout(color_choice);
364
365        // Event header
366        stdout.set_color(ColorSpec::new().set_fg(Some(Color::Cyan)).set_bold(true))?;
367        write!(stdout, "Event")?;
368        stdout.reset()?;
369        writeln!(
370            stdout,
371            " {} [{}]:",
372            event.id,
373            event.event_type.to_uppercase()
374        )?;
375
376        // Ledger info
377        stdout.set_color(ColorSpec::new().set_fg(Some(Color::White)).set_dimmed(true))?;
378        write!(stdout, "  Ledger:   ")?;
379        stdout.reset()?;
380        writeln!(
381            stdout,
382            "{} (closed at {})",
383            event.ledger, event.ledger_closed_at
384        )?;
385
386        // Contract
387        stdout.set_color(ColorSpec::new().set_fg(Some(Color::White)).set_dimmed(true))?;
388        write!(stdout, "  Contract: ")?;
389        stdout.reset()?;
390        writeln!(stdout, "{}", decoded.contract_id)?;
391
392        // Event name with prefix topics
393        stdout.set_color(ColorSpec::new().set_fg(Some(Color::White)).set_dimmed(true))?;
394        write!(stdout, "  Event:    ")?;
395        stdout.reset()?;
396        stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green)).set_bold(true))?;
397        write!(stdout, "{}", decoded.event_name)?;
398        stdout.reset()?;
399        if !decoded.prefix_topics.is_empty() {
400            write!(stdout, " ({})", decoded.prefix_topics.join(", "))?;
401        }
402        writeln!(stdout)?;
403
404        // Params
405        if !decoded.params.is_empty() {
406            stdout.set_color(ColorSpec::new().set_fg(Some(Color::White)).set_dimmed(true))?;
407            writeln!(stdout, "  Params:")?;
408            stdout.reset()?;
409            for (name, value) in &decoded.params {
410                stdout.set_color(ColorSpec::new().set_fg(Some(Color::Yellow)))?;
411                write!(stdout, "    {name}")?;
412                stdout.reset()?;
413                write!(stdout, ": ")?;
414                stdout.set_color(ColorSpec::new().set_fg(Some(Color::White)))?;
415                writeln!(stdout, "{value}")?;
416                stdout.reset()?;
417            }
418        }
419
420        writeln!(stdout)?;
421        Ok(())
422    }
423
424    pub async fn execute(&self, config: &config::Args) -> Result<rpc::GetEventsResponse, Error> {
425        let start = self.start()?;
426        let network = config.get_network()?;
427        let client = network.rpc_client()?;
428        client
429            .verify_network_passphrase(Some(&network.network_passphrase))
430            .await?;
431
432        let contract_ids: Vec<String> = self
433            .contract_ids
434            .iter()
435            .map(|id| {
436                Ok(id
437                    .resolve_contract_id(&self.locator, &network.network_passphrase)?
438                    .to_string())
439            })
440            .collect::<Result<Vec<_>, Error>>()?;
441
442        let parsed_topics = self.parse_topics()?;
443
444        client
445            .get_events(
446                start,
447                Some(self.event_type),
448                &contract_ids,
449                &parsed_topics,
450                Some(self.count),
451            )
452            .await
453            .map_err(Error::Rpc)
454    }
455
456    fn parse_topics(&self) -> Result<Vec<rpc::TopicFilter>, Error> {
457        if self.topic_filters.len() > 5 {
458            return Err(Error::MaxTopicFilters {
459                filter_count: self.topic_filters.len(),
460            });
461        }
462        let mut topic_filters: Vec<rpc::TopicFilter> = Vec::new();
463        for topic in &self.topic_filters {
464            let mut topic_filter: rpc::TopicFilter = Vec::new(); // a topic filter is a collection of segments
465            for (i, segment) in topic.split(',').enumerate() {
466                if i > 4 {
467                    return Err(Error::InvalidTopicFilter {
468                        topic: topic.clone(),
469                    });
470                }
471
472                if segment == "*" || segment == "**" {
473                    topic_filter.push(segment.to_owned());
474                } else {
475                    match xdr::ScVal::from_xdr_base64(segment, Limits::none()) {
476                        Ok(_s) => {
477                            topic_filter.push(segment.to_owned());
478                        }
479                        Err(e) => {
480                            return Err(Error::InvalidSegment {
481                                topic: topic.clone(),
482                                segment: segment.to_string(),
483                                error: e,
484                            });
485                        }
486                    }
487                }
488            }
489            topic_filters.push(topic_filter);
490        }
491
492        Ok(topic_filters)
493    }
494
495    fn start(&self) -> Result<rpc::EventStart, Error> {
496        let start = match (self.start_ledger, self.cursor.clone()) {
497            (Some(start), _) => rpc::EventStart::Ledger(start),
498            (_, Some(c)) => rpc::EventStart::Cursor(c),
499            // should never happen because of required_unless_present flags
500            _ => return Err(Error::MissingStartLedgerAndCursor),
501        };
502        Ok(start)
503    }
504}