1use clap::Parser;
2use indexmap::IndexMap;
3use soroban_spec_tools::event::DecodedEvent;
4use soroban_spec_tools::Spec;
5use std::collections::HashMap;
6use std::io;
7
8use crate::xdr::{self, Limits, ReadXdr, ScVal};
9use crate::{
10 config::{self, locator, network},
11 get_spec::get_remote_contract_spec,
12 rpc,
13};
14
15#[derive(Parser, Debug, Clone)]
16#[group(skip)]
17pub struct Cmd {
18 #[allow(clippy::doc_markdown)]
19 #[arg(long, conflicts_with = "cursor", required_unless_present = "cursor")]
22 start_ledger: Option<u32>,
23
24 #[arg(
26 long,
27 conflicts_with = "start_ledger",
28 required_unless_present = "start_ledger"
29 )]
30 cursor: Option<String>,
31
32 #[arg(long, value_enum, default_value = "pretty")]
34 output: OutputFormat,
35
36 #[arg(short, long, default_value = "10")]
38 count: usize,
39
40 #[arg(
48 long = "id",
49 num_args = 1..=6,
50 help_heading = "FILTERS"
51 )]
52 contract_ids: Vec<config::UnresolvedContract>,
53
54 #[arg(
71 long = "topic",
72 num_args = 1.., help_heading = "FILTERS"
74 )]
75 topic_filters: Vec<String>,
76
77 #[arg(
79 long = "type",
80 value_enum,
81 default_value = "all",
82 help_heading = "FILTERS"
83 )]
84 event_type: rpc::EventType,
85
86 #[command(flatten)]
87 locator: locator::Args,
88
89 #[command(flatten)]
90 network: network::Args,
91}
92
93#[derive(thiserror::Error, Debug)]
94pub enum Error {
95 #[error("cursor is not valid")]
96 InvalidCursor,
97 #[error("filepath does not exist: {path}")]
98 InvalidFile { path: String },
99 #[error("filepath ({path}) cannot be read: {error}")]
100 CannotReadFile { path: String, error: String },
101 #[error("max of 5 topic filters allowed per request, received {filter_count}")]
102 MaxTopicFilters { filter_count: usize },
103 #[error("cannot parse topic filter {topic} into 1-4 segments")]
104 InvalidTopicFilter { topic: String },
105 #[error("invalid segment ({segment}) in topic filter ({topic}): {error}")]
106 InvalidSegment {
107 topic: String,
108 segment: String,
109 error: xdr::Error,
110 },
111 #[error("cannot parse contract ID {contract_id}: {error}")]
112 InvalidContractId {
113 contract_id: String,
114 error: stellar_strkey::DecodeError,
115 },
116 #[error("invalid JSON string: {error} ({debug})")]
117 InvalidJson {
118 debug: String,
119 error: serde_json::Error,
120 },
121 #[error("invalid timestamp in event: {ts}")]
122 InvalidTimestamp { ts: String },
123 #[error("missing start_ledger and cursor")]
124 MissingStartLedgerAndCursor,
125 #[error("missing target")]
126 MissingTarget,
127 #[error(transparent)]
128 Rpc(#[from] rpc::Error),
129 #[error(transparent)]
130 Generic(#[from] Box<dyn std::error::Error>),
131 #[error(transparent)]
132 Io(#[from] io::Error),
133 #[error(transparent)]
134 Xdr(#[from] xdr::Error),
135 #[error(transparent)]
136 Serde(#[from] serde_json::Error),
137 #[error(transparent)]
138 Network(#[from] network::Error),
139 #[error(transparent)]
140 Locator(#[from] locator::Error),
141 #[error(transparent)]
142 Config(#[from] config::Error),
143 #[error(transparent)]
144 GetSpec(#[from] crate::get_spec::Error),
145}
146
147#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, clap::ValueEnum)]
148pub enum OutputFormat {
149 Pretty,
151
152 Plain,
154
155 Json,
157
158 Raw,
160}
161
162type SpecCache = HashMap<String, Option<Spec>>;
164
165#[derive(serde::Serialize, Debug)]
175struct DecodedEventWithMetadata {
176 id: String,
177 ledger: u32,
178 ledger_closed_at: String,
179 #[serde(rename = "type")]
180 event_type: String,
181 contract_id: String,
182 event_name: String,
183 prefix_topics: Vec<String>,
184 params: IndexMap<String, serde_json::Value>,
185}
186
187impl Cmd {
188 pub async fn run(&mut self) -> Result<(), Error> {
189 let config = config::Args {
190 locator: self.locator.clone(),
191 network: self.network.clone(),
192 source_account: config::UnresolvedMuxedAccount::default(),
193 sign_with: config::sign_with::Args::default(),
194 fee: None,
195 inclusion_fee: None,
196 };
197 let response = self.execute(&config).await?;
198
199 if response.events.is_empty() {
200 eprintln!("No events");
201 return Ok(());
202 }
203
204 let spec_cache = if self.output == OutputFormat::Raw {
206 HashMap::new()
207 } else {
208 self.build_spec_cache(&response.events, &config).await
209 };
210
211 for event in &response.events {
212 let decoded = if self.output == OutputFormat::Raw {
213 None
214 } else {
215 Self::try_decode_event(event, &spec_cache)
216 };
217
218 match self.output {
219 OutputFormat::Pretty => {
220 if let Some(decoded) = decoded {
221 Self::print_decoded_event(&decoded, event, true)?;
222 } else {
223 event.pretty_print()?;
224 }
225 }
226 OutputFormat::Plain => {
227 if let Some(decoded) = decoded {
228 Self::print_decoded_event(&decoded, event, false)?;
229 } else {
230 println!("{event}");
231 }
232 }
233 OutputFormat::Json => {
234 if let Some(decoded) = decoded {
236 let with_metadata = DecodedEventWithMetadata {
237 id: event.id.clone(),
238 ledger: event.ledger,
239 ledger_closed_at: event.ledger_closed_at.clone(),
240 event_type: event.event_type.clone(),
241 contract_id: decoded.contract_id.clone(),
242 event_name: decoded.event_name.clone(),
243 prefix_topics: decoded.prefix_topics.clone(),
244 params: decoded.params.clone(),
245 };
246 println!(
247 "{}",
248 serde_json::to_string(&with_metadata).map_err(|e| {
249 Error::InvalidJson {
250 debug: format!("{with_metadata:#?}"),
251 error: e,
252 }
253 })?
254 );
255 } else {
256 println!(
257 "{}",
258 serde_json::to_string(&event).map_err(|e| {
259 Error::InvalidJson {
260 debug: format!("{event:#?}"),
261 error: e,
262 }
263 })?
264 );
265 }
266 }
267 OutputFormat::Raw => {
268 event.pretty_print()?;
269 }
270 }
271 }
272 Ok(())
273 }
274
275 async fn build_spec_cache(&self, events: &[rpc::Event], config: &config::Args) -> SpecCache {
277 let unique_ids: Vec<_> = events
279 .iter()
280 .map(|e| e.contract_id.clone())
281 .collect::<std::collections::HashSet<_>>()
282 .into_iter()
283 .collect();
284
285 let fetch_futures: Vec<_> = unique_ids
287 .iter()
288 .map(|id| Self::fetch_spec_for_contract(id, config))
289 .collect();
290
291 let results = futures::future::join_all(fetch_futures).await;
292
293 unique_ids.into_iter().zip(results).collect()
294 }
295
296 async fn fetch_spec_for_contract(contract_id_str: &str, config: &config::Args) -> Option<Spec> {
298 let contract_id = match stellar_strkey::Contract::from_string(contract_id_str) {
300 Ok(id) => id,
301 Err(e) => {
302 tracing::debug!("Failed to parse contract ID {contract_id_str}: {e}");
303 return None;
304 }
305 };
306
307 match get_remote_contract_spec(
308 &contract_id.0,
309 &config.locator,
310 &config.network,
311 None,
312 Some(config),
313 )
314 .await
315 {
316 Ok(spec_entries) => Some(Spec::new(&spec_entries)),
317 Err(e) => {
318 tracing::debug!(
319 "Failed to fetch spec for contract {contract_id_str}: {e}. Events from this contract will use raw format."
320 );
321 None
322 }
323 }
324 }
325
326 fn try_decode_event(event: &rpc::Event, spec_cache: &SpecCache) -> Option<DecodedEvent> {
328 let spec = spec_cache.get(&event.contract_id)?.as_ref()?;
329
330 let topics: Vec<ScVal> = event
332 .topic
333 .iter()
334 .filter_map(|t| ScVal::from_xdr_base64(t, Limits::none()).ok())
335 .collect();
336
337 if topics.len() != event.topic.len() {
338 return None; }
340
341 let data = ScVal::from_xdr_base64(&event.value, Limits::none()).ok()?;
343
344 spec.decode_event(&event.contract_id, &topics, &data)
345 .inspect_err(|e| tracing::debug!("Failed to decode event {}: {e}", event.id))
346 .ok()
347 }
348
349 fn print_decoded_event(
351 decoded: &DecodedEvent,
352 event: &rpc::Event,
353 use_colors: bool,
354 ) -> Result<(), Error> {
355 use std::io::Write;
356 use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
357
358 let color_choice = if use_colors {
359 ColorChoice::Auto
360 } else {
361 ColorChoice::Never
362 };
363 let mut stdout = StandardStream::stdout(color_choice);
364
365 stdout.set_color(ColorSpec::new().set_fg(Some(Color::Cyan)).set_bold(true))?;
367 write!(stdout, "Event")?;
368 stdout.reset()?;
369 writeln!(
370 stdout,
371 " {} [{}]:",
372 event.id,
373 event.event_type.to_uppercase()
374 )?;
375
376 stdout.set_color(ColorSpec::new().set_fg(Some(Color::White)).set_dimmed(true))?;
378 write!(stdout, " Ledger: ")?;
379 stdout.reset()?;
380 writeln!(
381 stdout,
382 "{} (closed at {})",
383 event.ledger, event.ledger_closed_at
384 )?;
385
386 stdout.set_color(ColorSpec::new().set_fg(Some(Color::White)).set_dimmed(true))?;
388 write!(stdout, " Contract: ")?;
389 stdout.reset()?;
390 writeln!(stdout, "{}", decoded.contract_id)?;
391
392 stdout.set_color(ColorSpec::new().set_fg(Some(Color::White)).set_dimmed(true))?;
394 write!(stdout, " Event: ")?;
395 stdout.reset()?;
396 stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green)).set_bold(true))?;
397 write!(stdout, "{}", decoded.event_name)?;
398 stdout.reset()?;
399 if !decoded.prefix_topics.is_empty() {
400 write!(stdout, " ({})", decoded.prefix_topics.join(", "))?;
401 }
402 writeln!(stdout)?;
403
404 if !decoded.params.is_empty() {
406 stdout.set_color(ColorSpec::new().set_fg(Some(Color::White)).set_dimmed(true))?;
407 writeln!(stdout, " Params:")?;
408 stdout.reset()?;
409 for (name, value) in &decoded.params {
410 stdout.set_color(ColorSpec::new().set_fg(Some(Color::Yellow)))?;
411 write!(stdout, " {name}")?;
412 stdout.reset()?;
413 write!(stdout, ": ")?;
414 stdout.set_color(ColorSpec::new().set_fg(Some(Color::White)))?;
415 writeln!(stdout, "{value}")?;
416 stdout.reset()?;
417 }
418 }
419
420 writeln!(stdout)?;
421 Ok(())
422 }
423
424 pub async fn execute(&self, config: &config::Args) -> Result<rpc::GetEventsResponse, Error> {
425 let start = self.start()?;
426 let network = config.get_network()?;
427 let client = network.rpc_client()?;
428 client
429 .verify_network_passphrase(Some(&network.network_passphrase))
430 .await?;
431
432 let contract_ids: Vec<String> = self
433 .contract_ids
434 .iter()
435 .map(|id| {
436 Ok(id
437 .resolve_contract_id(&self.locator, &network.network_passphrase)?
438 .to_string())
439 })
440 .collect::<Result<Vec<_>, Error>>()?;
441
442 let parsed_topics = self.parse_topics()?;
443
444 client
445 .get_events(
446 start,
447 Some(self.event_type),
448 &contract_ids,
449 &parsed_topics,
450 Some(self.count),
451 )
452 .await
453 .map_err(Error::Rpc)
454 }
455
456 fn parse_topics(&self) -> Result<Vec<rpc::TopicFilter>, Error> {
457 if self.topic_filters.len() > 5 {
458 return Err(Error::MaxTopicFilters {
459 filter_count: self.topic_filters.len(),
460 });
461 }
462 let mut topic_filters: Vec<rpc::TopicFilter> = Vec::new();
463 for topic in &self.topic_filters {
464 let mut topic_filter: rpc::TopicFilter = Vec::new(); for (i, segment) in topic.split(',').enumerate() {
466 if i > 4 {
467 return Err(Error::InvalidTopicFilter {
468 topic: topic.clone(),
469 });
470 }
471
472 if segment == "*" || segment == "**" {
473 topic_filter.push(segment.to_owned());
474 } else {
475 match xdr::ScVal::from_xdr_base64(segment, Limits::none()) {
476 Ok(_s) => {
477 topic_filter.push(segment.to_owned());
478 }
479 Err(e) => {
480 return Err(Error::InvalidSegment {
481 topic: topic.clone(),
482 segment: segment.to_string(),
483 error: e,
484 });
485 }
486 }
487 }
488 }
489 topic_filters.push(topic_filter);
490 }
491
492 Ok(topic_filters)
493 }
494
495 fn start(&self) -> Result<rpc::EventStart, Error> {
496 let start = match (self.start_ledger, self.cursor.clone()) {
497 (Some(start), _) => rpc::EventStart::Ledger(start),
498 (_, Some(c)) => rpc::EventStart::Cursor(c),
499 _ => return Err(Error::MissingStartLedgerAndCursor),
501 };
502 Ok(start)
503 }
504}