1use clap::Parser;
2use indexmap::IndexMap;
3use soroban_spec_tools::event::DecodedEvent;
4use soroban_spec_tools::{sanitize, Spec};
5use std::collections::HashMap;
6use std::io;
7
8use crate::xdr::{self, Limits, ReadXdr, ScVal};
9use crate::{
10 config::{self, locator, network},
11 get_spec::get_remote_contract_spec,
12 rpc,
13};
14
15#[derive(Parser, Debug, Clone)]
16#[group(skip)]
17pub struct Cmd {
18 #[allow(clippy::doc_markdown)]
19 #[arg(long, conflicts_with = "cursor", required_unless_present = "cursor")]
22 start_ledger: Option<u32>,
23
24 #[arg(
26 long,
27 conflicts_with = "start_ledger",
28 required_unless_present = "start_ledger"
29 )]
30 cursor: Option<String>,
31
32 #[arg(long, value_enum, default_value = "pretty")]
34 output: OutputFormat,
35
36 #[arg(short, long, default_value = "10")]
38 count: usize,
39
40 #[arg(
48 long = "id",
49 num_args = 1..=6,
50 help_heading = "FILTERS"
51 )]
52 contract_ids: Vec<config::UnresolvedContract>,
53
54 #[arg(
71 long = "topic",
72 num_args = 1.., help_heading = "FILTERS"
74 )]
75 topic_filters: Vec<String>,
76
77 #[arg(
79 long = "type",
80 value_enum,
81 default_value = "all",
82 help_heading = "FILTERS"
83 )]
84 event_type: rpc::EventType,
85
86 #[command(flatten)]
87 locator: locator::Args,
88
89 #[command(flatten)]
90 network: network::Args,
91}
92
93#[derive(thiserror::Error, Debug)]
94pub enum Error {
95 #[error("cursor is not valid")]
96 InvalidCursor,
97 #[error("filepath does not exist: {path}")]
98 InvalidFile { path: String },
99 #[error("filepath ({path}) cannot be read: {error}")]
100 CannotReadFile { path: String, error: String },
101 #[error("max of 5 topic filters allowed per request, received {filter_count}")]
102 MaxTopicFilters { filter_count: usize },
103 #[error("cannot parse topic filter {topic} into 1-4 segments")]
104 InvalidTopicFilter { topic: String },
105 #[error("invalid segment ({segment}) in topic filter ({topic}): {error}")]
106 InvalidSegment {
107 topic: String,
108 segment: String,
109 error: xdr::Error,
110 },
111 #[error("cannot parse contract ID {contract_id}: {error}")]
112 InvalidContractId {
113 contract_id: String,
114 error: stellar_strkey::DecodeError,
115 },
116 #[error("invalid JSON string: {error} ({debug})")]
117 InvalidJson {
118 debug: String,
119 error: serde_json::Error,
120 },
121 #[error("invalid timestamp in event: {ts}")]
122 InvalidTimestamp { ts: String },
123 #[error("missing start_ledger and cursor")]
124 MissingStartLedgerAndCursor,
125 #[error("missing target")]
126 MissingTarget,
127 #[error(transparent)]
128 Rpc(#[from] rpc::Error),
129 #[error(transparent)]
130 Generic(#[from] Box<dyn std::error::Error>),
131 #[error(transparent)]
132 Io(#[from] io::Error),
133 #[error(transparent)]
134 Xdr(#[from] xdr::Error),
135 #[error(transparent)]
136 Serde(#[from] serde_json::Error),
137 #[error(transparent)]
138 Network(#[from] network::Error),
139 #[error(transparent)]
140 Locator(#[from] locator::Error),
141 #[error(transparent)]
142 Config(#[from] config::Error),
143 #[error(transparent)]
144 GetSpec(#[from] crate::get_spec::Error),
145}
146
147#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, clap::ValueEnum)]
148pub enum OutputFormat {
149 Pretty,
151
152 Plain,
154
155 Json,
157
158 Raw,
160}
161
162type SpecCache = HashMap<String, Option<Spec>>;
164
165#[derive(serde::Serialize, Debug)]
175struct DecodedEventWithMetadata {
176 id: String,
177 ledger: u32,
178 ledger_closed_at: String,
179 #[serde(rename = "type")]
180 event_type: String,
181 contract_id: String,
182 event_name: String,
183 prefix_topics: Vec<String>,
184 params: IndexMap<String, serde_json::Value>,
185}
186
187impl Cmd {
188 pub async fn run(&mut self) -> Result<(), Error> {
189 let config = config::Args {
190 locator: self.locator.clone(),
191 network: self.network.clone(),
192 source_account: config::UnresolvedMuxedAccount::default(),
193 sign_with: config::sign_with::Args::default(),
194 fee: None,
195 inclusion_fee: None,
196 };
197 let response = self.execute(&config).await?;
198
199 if response.events.is_empty() {
200 eprintln!("No events");
201 return Ok(());
202 }
203
204 let spec_cache = if self.output == OutputFormat::Raw {
206 HashMap::new()
207 } else {
208 self.build_spec_cache(&response.events, &config).await
209 };
210
211 for event in &response.events {
212 let decoded = if self.output == OutputFormat::Raw {
213 None
214 } else {
215 Self::try_decode_event(event, &spec_cache)
216 };
217
218 match self.output {
219 OutputFormat::Pretty => {
220 if let Some(decoded) = decoded {
221 Self::print_decoded_event(&decoded, event, true)?;
222 } else {
223 event.pretty_print()?;
224 }
225 }
226 OutputFormat::Plain => {
227 if let Some(decoded) = decoded {
228 Self::print_decoded_event(&decoded, event, false)?;
229 } else {
230 println!("{event}");
231 }
232 }
233 OutputFormat::Json => {
234 if let Some(decoded) = decoded {
236 let with_metadata = DecodedEventWithMetadata {
237 id: event.id.clone(),
238 ledger: event.ledger,
239 ledger_closed_at: event.ledger_closed_at.clone(),
240 event_type: event.event_type.clone(),
241 contract_id: decoded.contract_id.clone(),
242 event_name: decoded.event_name.clone(),
243 prefix_topics: decoded.prefix_topics.clone(),
244 params: decoded.params.clone(),
245 };
246 println!(
247 "{}",
248 serde_json::to_string(&with_metadata).map_err(|e| {
249 Error::InvalidJson {
250 debug: format!("{with_metadata:#?}"),
251 error: e,
252 }
253 })?
254 );
255 } else {
256 println!(
257 "{}",
258 serde_json::to_string(&event).map_err(|e| {
259 Error::InvalidJson {
260 debug: format!("{event:#?}"),
261 error: e,
262 }
263 })?
264 );
265 }
266 }
267 OutputFormat::Raw => {
268 event.pretty_print()?;
269 }
270 }
271 }
272 Ok(())
273 }
274
275 async fn build_spec_cache(&self, events: &[rpc::Event], config: &config::Args) -> SpecCache {
277 let unique_ids: Vec<_> = events
279 .iter()
280 .map(|e| e.contract_id.clone())
281 .collect::<std::collections::HashSet<_>>()
282 .into_iter()
283 .collect();
284
285 let fetch_futures: Vec<_> = unique_ids
287 .iter()
288 .map(|id| Self::fetch_spec_for_contract(id, config))
289 .collect();
290
291 let results = futures::future::join_all(fetch_futures).await;
292
293 unique_ids.into_iter().zip(results).collect()
294 }
295
296 async fn fetch_spec_for_contract(contract_id_str: &str, config: &config::Args) -> Option<Spec> {
298 let contract_id = match stellar_strkey::Contract::from_string(contract_id_str) {
300 Ok(id) => id,
301 Err(e) => {
302 tracing::debug!("Failed to parse contract ID {contract_id_str}: {e}");
303 return None;
304 }
305 };
306
307 match get_remote_contract_spec(
308 &contract_id.0,
309 &config.locator,
310 &config.network,
311 None,
312 Some(config),
313 )
314 .await
315 {
316 Ok(spec_entries) => Some(Spec::new(&spec_entries)),
317 Err(e) => {
318 tracing::debug!(
319 "Failed to fetch spec for contract {contract_id_str}: {e}. Events from this contract will use raw format."
320 );
321 None
322 }
323 }
324 }
325
326 fn try_decode_event(event: &rpc::Event, spec_cache: &SpecCache) -> Option<DecodedEvent> {
328 let spec = spec_cache.get(&event.contract_id)?.as_ref()?;
329
330 let topics: Vec<ScVal> = event
332 .topic
333 .iter()
334 .filter_map(|t| ScVal::from_xdr_base64(t, Limits::none()).ok())
335 .collect();
336
337 if topics.len() != event.topic.len() {
338 return None; }
340
341 let data = ScVal::from_xdr_base64(&event.value, Limits::none()).ok()?;
343
344 spec.decode_event(&event.contract_id, &topics, &data)
345 .inspect_err(|e| tracing::debug!("Failed to decode event {}: {e}", event.id))
346 .ok()
347 }
348
349 fn print_decoded_event(
351 decoded: &DecodedEvent,
352 event: &rpc::Event,
353 use_colors: bool,
354 ) -> Result<(), Error> {
355 use termcolor::{ColorChoice, StandardStream};
356
357 let color_choice = if use_colors {
358 ColorChoice::Auto
359 } else {
360 ColorChoice::Never
361 };
362 let mut stdout = StandardStream::stdout(color_choice);
363 Self::write_decoded_event(&mut stdout, decoded, event)
364 }
365
366 fn write_decoded_event<W: termcolor::WriteColor>(
367 stdout: &mut W,
368 decoded: &DecodedEvent,
369 event: &rpc::Event,
370 ) -> Result<(), Error> {
371 use termcolor::{Color, ColorSpec};
372
373 stdout.set_color(ColorSpec::new().set_fg(Some(Color::Cyan)).set_bold(true))?;
375 write!(stdout, "Event")?;
376 stdout.reset()?;
377 writeln!(
378 stdout,
379 " {} [{}]:",
380 event.id,
381 event.event_type.to_uppercase()
382 )?;
383
384 stdout.set_color(ColorSpec::new().set_fg(Some(Color::White)).set_dimmed(true))?;
386 write!(stdout, " Ledger: ")?;
387 stdout.reset()?;
388 writeln!(
389 stdout,
390 "{} (closed at {})",
391 event.ledger, event.ledger_closed_at
392 )?;
393
394 stdout.set_color(ColorSpec::new().set_fg(Some(Color::White)).set_dimmed(true))?;
396 write!(stdout, " Contract: ")?;
397 stdout.reset()?;
398 writeln!(stdout, "{}", decoded.contract_id)?;
399
400 stdout.set_color(ColorSpec::new().set_fg(Some(Color::White)).set_dimmed(true))?;
402 write!(stdout, " Event: ")?;
403 stdout.reset()?;
404 stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green)).set_bold(true))?;
405 write!(stdout, "{}", sanitize(&decoded.event_name))?;
406 stdout.reset()?;
407 if !decoded.prefix_topics.is_empty() {
408 let prefix = decoded
409 .prefix_topics
410 .iter()
411 .map(|t| sanitize(t))
412 .collect::<Vec<_>>()
413 .join(", ");
414 write!(stdout, " ({prefix})")?;
415 }
416 writeln!(stdout)?;
417
418 if !decoded.params.is_empty() {
420 stdout.set_color(ColorSpec::new().set_fg(Some(Color::White)).set_dimmed(true))?;
421 writeln!(stdout, " Params:")?;
422 stdout.reset()?;
423 for (name, value) in &decoded.params {
424 stdout.set_color(ColorSpec::new().set_fg(Some(Color::Yellow)))?;
425 write!(stdout, " {}", sanitize(name))?;
426 stdout.reset()?;
427 write!(stdout, ": ")?;
428 stdout.set_color(ColorSpec::new().set_fg(Some(Color::White)))?;
429 writeln!(stdout, "{value}")?;
430 stdout.reset()?;
431 }
432 }
433
434 writeln!(stdout)?;
435 Ok(())
436 }
437
438 pub async fn execute(&self, config: &config::Args) -> Result<rpc::GetEventsResponse, Error> {
439 let start = self.start()?;
440 let network = config.get_network()?;
441 let client = network.rpc_client()?;
442 client
443 .verify_network_passphrase(Some(&network.network_passphrase))
444 .await?;
445
446 let contract_ids: Vec<String> = self
447 .contract_ids
448 .iter()
449 .map(|id| {
450 Ok(format!(
451 "{}",
452 id.resolve_contract_id(&self.locator, &network.network_passphrase)?
453 ))
454 })
455 .collect::<Result<Vec<_>, Error>>()?;
456
457 let parsed_topics = self.parse_topics()?;
458
459 client
460 .get_events(
461 start,
462 Some(self.event_type),
463 &contract_ids,
464 &parsed_topics,
465 Some(self.count),
466 )
467 .await
468 .map_err(Error::Rpc)
469 }
470
471 fn parse_topics(&self) -> Result<Vec<rpc::TopicFilter>, Error> {
472 if self.topic_filters.len() > 5 {
473 return Err(Error::MaxTopicFilters {
474 filter_count: self.topic_filters.len(),
475 });
476 }
477 let mut topic_filters: Vec<rpc::TopicFilter> = Vec::new();
478 for topic in &self.topic_filters {
479 let mut topic_filter: rpc::TopicFilter = Vec::new(); for (i, segment) in topic.split(',').enumerate() {
481 if i > 4 {
482 return Err(Error::InvalidTopicFilter {
483 topic: topic.clone(),
484 });
485 }
486
487 if segment == "*" || segment == "**" {
488 topic_filter.push(segment.to_owned());
489 } else {
490 match xdr::ScVal::from_xdr_base64(segment, Limits::none()) {
491 Ok(_s) => {
492 topic_filter.push(segment.to_owned());
493 }
494 Err(e) => {
495 return Err(Error::InvalidSegment {
496 topic: topic.clone(),
497 segment: segment.to_string(),
498 error: e,
499 });
500 }
501 }
502 }
503 }
504 topic_filters.push(topic_filter);
505 }
506
507 Ok(topic_filters)
508 }
509
510 fn start(&self) -> Result<rpc::EventStart, Error> {
511 let start = match (self.start_ledger, self.cursor.clone()) {
512 (Some(start), _) => rpc::EventStart::Ledger(start),
513 (_, Some(c)) => rpc::EventStart::Cursor(c),
514 _ => return Err(Error::MissingStartLedgerAndCursor),
516 };
517 Ok(start)
518 }
519}
520
521#[cfg(test)]
522mod tests {
523 use super::*;
524 use serde_json::json;
525 use soroban_spec_tools::test_utils::assert_no_control_chars;
526 use termcolor::Buffer;
527
528 fn evil_event() -> rpc::Event {
529 rpc::Event {
530 event_type: "contract".into(),
531 ledger: 1,
532 ledger_closed_at: "2026-01-01T00:00:00Z".into(),
533 contract_id: "CACA".into(),
534 id: "0000000001-0000000001".into(),
535 operation_index: None,
536 transaction_index: None,
537 tx_hash: None,
538 #[allow(deprecated)]
539 is_successful_contract_call: None,
540 topic: vec![],
541 value: String::new(),
542 }
543 }
544
545 fn evil_decoded() -> DecodedEvent {
546 let mut params = IndexMap::new();
547 params.insert("amount\x1b[31m".to_string(), json!(1000));
548 DecodedEvent {
549 contract_id: "CACA".to_string(),
550 event_name: "\x1b[2J\x1b[Htransfer".to_string(),
551 prefix_topics: vec!["\x1b[31mEVIL".into(), "topic2".into()],
552 params,
553 }
554 }
555
556 #[test]
557 fn write_decoded_event_strips_attacker_control_bytes() {
558 let mut buf = Buffer::no_color();
559 Cmd::write_decoded_event(&mut buf, &evil_decoded(), &evil_event()).unwrap();
560 let output = String::from_utf8(buf.into_inner()).unwrap();
561 assert_no_control_chars(&output);
562 }
563}