1use clap::{arg, command, Parser};
2use std::io;
3
4use crate::xdr::{self, Limits, ReadXdr};
5
6use super::{global, NetworkRunnable};
7use crate::{
8 config::{self, locator, network},
9 rpc,
10};
11
12#[derive(Parser, Debug, Clone)]
13#[group(skip)]
14pub struct Cmd {
15 #[allow(clippy::doc_markdown)]
16 #[arg(long, conflicts_with = "cursor", required_unless_present = "cursor")]
19 start_ledger: Option<u32>,
20 #[arg(
22 long,
23 conflicts_with = "start_ledger",
24 required_unless_present = "start_ledger"
25 )]
26 cursor: Option<String>,
27 #[arg(long, value_enum, default_value = "pretty")]
29 output: OutputFormat,
30 #[arg(short, long, default_value = "10")]
32 count: usize,
33 #[arg(
41 long = "id",
42 num_args = 1..=6,
43 help_heading = "FILTERS"
44 )]
45 contract_ids: Vec<config::UnresolvedContract>,
46 #[arg(
57 long = "topic",
58 num_args = 1..=5,
59 help_heading = "FILTERS"
60 )]
61 topic_filters: Vec<String>,
62 #[arg(
64 long = "type",
65 value_enum,
66 default_value = "all",
67 help_heading = "FILTERS"
68 )]
69 event_type: rpc::EventType,
70 #[command(flatten)]
71 locator: locator::Args,
72 #[command(flatten)]
73 network: network::Args,
74}
75
76#[derive(thiserror::Error, Debug)]
77pub enum Error {
78 #[error("cursor is not valid")]
79 InvalidCursor,
80 #[error("filepath does not exist: {path}")]
81 InvalidFile { path: String },
82 #[error("filepath ({path}) cannot be read: {error}")]
83 CannotReadFile { path: String, error: String },
84 #[error("cannot parse topic filter {topic} into 1-4 segments")]
85 InvalidTopicFilter { topic: String },
86 #[error("invalid segment ({segment}) in topic filter ({topic}): {error}")]
87 InvalidSegment {
88 topic: String,
89 segment: String,
90 error: xdr::Error,
91 },
92 #[error("cannot parse contract ID {contract_id}: {error}")]
93 InvalidContractId {
94 contract_id: String,
95 error: stellar_strkey::DecodeError,
96 },
97 #[error("invalid JSON string: {error} ({debug})")]
98 InvalidJson {
99 debug: String,
100 error: serde_json::Error,
101 },
102 #[error("invalid timestamp in event: {ts}")]
103 InvalidTimestamp { ts: String },
104 #[error("missing start_ledger and cursor")]
105 MissingStartLedgerAndCursor,
106 #[error("missing target")]
107 MissingTarget,
108 #[error(transparent)]
109 Rpc(#[from] rpc::Error),
110 #[error(transparent)]
111 Generic(#[from] Box<dyn std::error::Error>),
112 #[error(transparent)]
113 Io(#[from] io::Error),
114 #[error(transparent)]
115 Xdr(#[from] xdr::Error),
116 #[error(transparent)]
117 Serde(#[from] serde_json::Error),
118 #[error(transparent)]
119 Network(#[from] network::Error),
120 #[error(transparent)]
121 Locator(#[from] locator::Error),
122 #[error(transparent)]
123 Config(#[from] config::Error),
124}
125
126#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, clap::ValueEnum)]
127pub enum OutputFormat {
128 Pretty,
130 Plain,
132 Json,
134}
135
136impl Cmd {
137 pub async fn run(&mut self) -> Result<(), Error> {
138 for topic in &self.topic_filters {
140 for (i, segment) in topic.split(',').enumerate() {
141 if i > 4 {
142 return Err(Error::InvalidTopicFilter {
143 topic: topic.clone(),
144 });
145 }
146
147 if segment != "*" {
148 if let Err(e) = xdr::ScVal::from_xdr_base64(segment, Limits::none()) {
149 return Err(Error::InvalidSegment {
150 topic: topic.clone(),
151 segment: segment.to_string(),
152 error: e,
153 });
154 }
155 }
156 }
157 }
158
159 let response = self.run_against_rpc_server(None, None).await?;
160
161 if response.events.is_empty() {
162 eprintln!("No events");
163 }
164
165 for event in &response.events {
166 match self.output {
167 OutputFormat::Json => {
171 println!(
172 "{}",
173 serde_json::to_string_pretty(&event).map_err(|e| {
174 Error::InvalidJson {
175 debug: format!("{event:#?}"),
176 error: e,
177 }
178 })?,
179 );
180 }
181 OutputFormat::Plain => println!("{event}"),
182 OutputFormat::Pretty => event.pretty_print()?,
183 }
184 }
185 Ok(())
186 }
187
188 fn start(&self) -> Result<rpc::EventStart, Error> {
189 let start = match (self.start_ledger, self.cursor.clone()) {
190 (Some(start), _) => rpc::EventStart::Ledger(start),
191 (_, Some(c)) => rpc::EventStart::Cursor(c),
192 _ => return Err(Error::MissingStartLedgerAndCursor),
194 };
195 Ok(start)
196 }
197}
198
199#[async_trait::async_trait]
200impl NetworkRunnable for Cmd {
201 type Error = Error;
202 type Result = rpc::GetEventsResponse;
203
204 async fn run_against_rpc_server(
205 &self,
206 _args: Option<&global::Args>,
207 config: Option<&config::Args>,
208 ) -> Result<rpc::GetEventsResponse, Error> {
209 let start = self.start()?;
210 let network = if let Some(config) = config {
211 Ok(config.get_network()?)
212 } else {
213 self.network.get(&self.locator)
214 }?;
215
216 let client = network.rpc_client()?;
217 client
218 .verify_network_passphrase(Some(&network.network_passphrase))
219 .await?;
220
221 let contract_ids: Vec<String> = self
222 .contract_ids
223 .iter()
224 .map(|id| {
225 Ok(id
226 .resolve_contract_id(&self.locator, &network.network_passphrase)?
227 .to_string())
228 })
229 .collect::<Result<Vec<_>, Error>>()?;
230
231 Ok(client
232 .get_events(
233 start,
234 Some(self.event_type),
235 &contract_ids,
236 &self.topic_filters,
237 Some(self.count),
238 )
239 .await
240 .map_err(Error::Rpc)?)
241 }
242}