1use clap::Parser;
2use std::io;
3
4use crate::xdr::{self, Limits, ReadXdr};
5
6use super::{global, NetworkRunnable};
7use crate::{
8 config::{self, locator, network},
9 rpc,
10};
11
12#[derive(Parser, Debug, Clone)]
13#[group(skip)]
14pub struct Cmd {
15 #[allow(clippy::doc_markdown)]
16 #[arg(long, conflicts_with = "cursor", required_unless_present = "cursor")]
19 start_ledger: Option<u32>,
20 #[arg(
22 long,
23 conflicts_with = "start_ledger",
24 required_unless_present = "start_ledger"
25 )]
26 cursor: Option<String>,
27 #[arg(long, value_enum, default_value = "pretty")]
29 output: OutputFormat,
30 #[arg(short, long, default_value = "10")]
32 count: usize,
33 #[arg(
41 long = "id",
42 num_args = 1..=6,
43 help_heading = "FILTERS"
44 )]
45 contract_ids: Vec<config::UnresolvedContract>,
46 #[arg(
63 long = "topic",
64 num_args = 1.., help_heading = "FILTERS"
66 )]
67 topic_filters: Vec<String>,
68 #[arg(
70 long = "type",
71 value_enum,
72 default_value = "all",
73 help_heading = "FILTERS"
74 )]
75 event_type: rpc::EventType,
76 #[command(flatten)]
77 locator: locator::Args,
78 #[command(flatten)]
79 network: network::Args,
80}
81
82#[derive(thiserror::Error, Debug)]
83pub enum Error {
84 #[error("cursor is not valid")]
85 InvalidCursor,
86 #[error("filepath does not exist: {path}")]
87 InvalidFile { path: String },
88 #[error("filepath ({path}) cannot be read: {error}")]
89 CannotReadFile { path: String, error: String },
90 #[error("max of 5 topic filters allowed per request, received {filter_count}")]
91 MaxTopicFilters { filter_count: usize },
92 #[error("cannot parse topic filter {topic} into 1-4 segments")]
93 InvalidTopicFilter { topic: String },
94 #[error("invalid segment ({segment}) in topic filter ({topic}): {error}")]
95 InvalidSegment {
96 topic: String,
97 segment: String,
98 error: xdr::Error,
99 },
100 #[error("cannot parse contract ID {contract_id}: {error}")]
101 InvalidContractId {
102 contract_id: String,
103 error: stellar_strkey::DecodeError,
104 },
105 #[error("invalid JSON string: {error} ({debug})")]
106 InvalidJson {
107 debug: String,
108 error: serde_json::Error,
109 },
110 #[error("invalid timestamp in event: {ts}")]
111 InvalidTimestamp { ts: String },
112 #[error("missing start_ledger and cursor")]
113 MissingStartLedgerAndCursor,
114 #[error("missing target")]
115 MissingTarget,
116 #[error(transparent)]
117 Rpc(#[from] rpc::Error),
118 #[error(transparent)]
119 Generic(#[from] Box<dyn std::error::Error>),
120 #[error(transparent)]
121 Io(#[from] io::Error),
122 #[error(transparent)]
123 Xdr(#[from] xdr::Error),
124 #[error(transparent)]
125 Serde(#[from] serde_json::Error),
126 #[error(transparent)]
127 Network(#[from] network::Error),
128 #[error(transparent)]
129 Locator(#[from] locator::Error),
130 #[error(transparent)]
131 Config(#[from] config::Error),
132}
133
134#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, clap::ValueEnum)]
135pub enum OutputFormat {
136 Pretty,
138 Plain,
140 Json,
142}
143
144impl Cmd {
145 pub async fn run(&mut self) -> Result<(), Error> {
146 let response = self.run_against_rpc_server(None, None).await?;
147
148 if response.events.is_empty() {
149 eprintln!("No events");
150 }
151
152 for event in &response.events {
153 match self.output {
154 OutputFormat::Json => {
158 println!(
159 "{}",
160 serde_json::to_string_pretty(&event).map_err(|e| {
161 Error::InvalidJson {
162 debug: format!("{event:#?}"),
163 error: e,
164 }
165 })?,
166 );
167 }
168 OutputFormat::Plain => println!("{event}"),
169 OutputFormat::Pretty => event.pretty_print()?,
170 }
171 }
172 Ok(())
173 }
174
175 fn parse_topics(&self) -> Result<Vec<rpc::TopicFilter>, Error> {
176 if self.topic_filters.len() > 5 {
177 return Err(Error::MaxTopicFilters {
178 filter_count: self.topic_filters.len(),
179 });
180 }
181 let mut topic_filters: Vec<rpc::TopicFilter> = Vec::new();
182 for topic in &self.topic_filters {
183 let mut topic_filter: rpc::TopicFilter = Vec::new(); for (i, segment) in topic.split(',').enumerate() {
185 if i > 4 {
186 return Err(Error::InvalidTopicFilter {
187 topic: topic.clone(),
188 });
189 }
190
191 if segment == "*" || segment == "**" {
192 topic_filter.push(segment.to_owned());
193 } else {
194 match xdr::ScVal::from_xdr_base64(segment, Limits::none()) {
195 Ok(_s) => {
196 topic_filter.push(segment.to_owned());
197 }
198 Err(e) => {
199 return Err(Error::InvalidSegment {
200 topic: topic.clone(),
201 segment: segment.to_string(),
202 error: e,
203 });
204 }
205 }
206 }
207 }
208 topic_filters.push(topic_filter);
209 }
210
211 Ok(topic_filters)
212 }
213
214 fn start(&self) -> Result<rpc::EventStart, Error> {
215 let start = match (self.start_ledger, self.cursor.clone()) {
216 (Some(start), _) => rpc::EventStart::Ledger(start),
217 (_, Some(c)) => rpc::EventStart::Cursor(c),
218 _ => return Err(Error::MissingStartLedgerAndCursor),
220 };
221 Ok(start)
222 }
223}
224
225#[async_trait::async_trait]
226impl NetworkRunnable for Cmd {
227 type Error = Error;
228 type Result = rpc::GetEventsResponse;
229
230 async fn run_against_rpc_server(
231 &self,
232 _args: Option<&global::Args>,
233 config: Option<&config::Args>,
234 ) -> Result<rpc::GetEventsResponse, Error> {
235 let start = self.start()?;
236 let network = if let Some(config) = config {
237 Ok(config.get_network()?)
238 } else {
239 self.network.get(&self.locator)
240 }?;
241
242 let client = network.rpc_client()?;
243 client
244 .verify_network_passphrase(Some(&network.network_passphrase))
245 .await?;
246
247 let contract_ids: Vec<String> = self
248 .contract_ids
249 .iter()
250 .map(|id| {
251 Ok(id
252 .resolve_contract_id(&self.locator, &network.network_passphrase)?
253 .to_string())
254 })
255 .collect::<Result<Vec<_>, Error>>()?;
256
257 let parsed_topics = self.parse_topics()?;
258
259 Ok(client
260 .get_events(
261 start,
262 Some(self.event_type),
263 &contract_ids,
264 &parsed_topics,
265 Some(self.count),
266 )
267 .await
268 .map_err(Error::Rpc)?)
269 }
270}