yellowstone_vixen/lib.rs
1#![deny(
2 clippy::disallowed_methods,
3 clippy::suspicious,
4 clippy::style,
5 clippy::clone_on_ref_ptr,
6 missing_debug_implementations,
7 missing_copy_implementations
8)]
9#![warn(clippy::pedantic, missing_docs)]
10#![allow(clippy::module_name_repetitions)]
11
12//! Vixen provides a simple API for requesting, parsing, and consuming data
13//! from Yellowstone.
14
15use std::marker::PhantomData;
16
17use config::BufferConfig;
18use tokio::sync::{mpsc, oneshot};
19use yellowstone_grpc_proto::tonic::Status;
20
21use crate::sources::SourceExitStatus;
22
23#[cfg(feature = "prometheus")]
24pub extern crate prometheus;
25#[cfg(feature = "prometheus")]
26pub mod metrics;
27pub extern crate thiserror;
28pub extern crate yellowstone_vixen_core as vixen_core;
29pub use vixen_core::bs58;
30
31mod buffer;
32pub mod builder;
33pub mod config;
34pub mod handler;
35pub mod instruction;
36
37pub mod sources;
38
39/// Utility functions for the Vixen runtime.
40pub mod util;
41
42pub mod filter_pipeline;
43
44pub use handler::{Handler, HandlerResult, Pipeline};
45pub use util::*;
46use yellowstone_grpc_proto::geyser::SubscribeUpdate;
47pub use yellowstone_vixen_core::CommitmentLevel;
48
49use crate::{builder::RuntimeBuilder, sources::SourceTrait};
50
51/// An error thrown by the Vixen runtime.
52#[derive(Debug, thiserror::Error)]
53pub enum Error {
54 /// A system I/O error.
55 #[error("I/O error")]
56 Io(#[from] std::io::Error),
57 /// An error returned by a Yellowstone server.
58 #[error("Yellowstone client builder error")]
59 YellowstoneBuilder(#[from] yellowstone_grpc_client::GeyserGrpcBuilderError),
60 /// An error returned by a Yellowstone client.
61 #[error("Yellowstone client error")]
62 YellowstoneClient(#[from] yellowstone_grpc_client::GeyserGrpcClientError),
63 /// An error occurring when the Yellowstone client stops early.
64 #[error("Yellowstone client crashed")]
65 ClientHangup,
66 /// An error occurring when the Yellowstone server closes the connection.
67 #[error("Yellowstone stream hung up unexpectedly")]
68 ServerHangup,
69 /// A gRPC error returned by the Yellowstone server.
70 #[error("Yellowstone stream returned an error")]
71 YellowstoneStatus(#[from] yellowstone_grpc_proto::tonic::Status),
72 /// An error occurring when a datasource is not configured correctly.
73 #[error("Yellowstone stream config error")]
74 ConfigError,
75 /// An error occurring when a runtime error occurs.
76 #[error("Other error")]
77 Other(#[from] Box<dyn std::error::Error + Send + Sync>),
78}
79
80/// The main runtime for Vixen.
81#[derive(Debug)]
82pub struct Runtime<S: SourceTrait> {
83 buffer: BufferConfig,
84 source: S::Config,
85 pipelines: handler::PipelineSets,
86 #[cfg(feature = "prometheus")]
87 metrics_registry: prometheus::Registry,
88 _source: PhantomData<S>,
89}
90
91impl<S: SourceTrait> Runtime<S> {
92 /// Create a new runtime builder.
93 pub fn builder() -> RuntimeBuilder<S> { RuntimeBuilder::<S>::default() }
94}
95impl<S: SourceTrait> Runtime<S> {
96 /// Create a new Tokio runtime and run the Vixen runtime within it,
97 /// terminating the current process if the runtime crashes.
98 ///
99 /// For error handling, use the recoverable variant [`Self::try_run`].
100 ///
101 /// If you want to provide your own tokio Runtime because you need to run
102 /// async code outside of the Vixen runtime, use the [`Self::run_async`]
103 /// method.
104 ///
105 /// # Example
106 ///
107 /// ```ignore
108 /// use yellowstone_vixen::Pipeline;
109 /// use yellowstone_vixen_spl_token_parser::{AccountParser, InstructionParser};
110 ///
111 /// // MyHandler is a handler that implements the Handler trait
112 /// // NOTE: The main function is not async
113 /// fn main() {
114 /// Runtime::builder::<YellowstoneGrpcSource>()
115 /// .account(Pipeline::new(AccountParser, [MyHandler]))
116 /// .instruction(Pipeline::new(InstructionParser, [MyHandler]))
117 /// .build(config)
118 /// .run(); // Process will exit if an error occurs
119 /// }
120 /// ```
121 #[inline]
122 pub fn run(self) { util::handle_fatal(self.try_run()); }
123
124 /// Error returning variant of [`Self::run`].
125 ///
126 /// # Errors
127 /// This function returns an error if the runtime crashes.
128 #[inline]
129 pub fn try_run(self) -> Result<(), Box<Error>> {
130 tokio::runtime::Runtime::new()
131 .map_err(|e| Box::new(e.into()))?
132 .block_on(self.try_run_async())
133 }
134
135 /// Run the Vixen runtime asynchronously, terminating the current process
136 /// if the runtime crashes.
137 ///
138 /// For error handling, use the recoverable variant [`Self::try_run_async`].
139 ///
140 /// If you don't need to run any async code outside the Vixen runtime, you
141 /// can use the [`Self::run`] method instead, which takes care of creating
142 /// a tokio Runtime for you.
143 ///
144 /// # Example
145 ///
146 /// ```ignore
147 /// use yellowstone_vixen_parser::{
148 /// token_extension_program::{
149 /// AccountParser as TokenExtensionProgramAccParser,
150 /// InstructionParser as TokenExtensionProgramIxParser,
151 /// },
152 /// token_program::{
153 /// AccountParser as TokenProgramAccParser, InstructionParser as TokenProgramIxParser,
154 /// },
155 /// };
156 ///
157 /// // MyHandler is a handler that implements the Handler trait
158 ///
159 /// #[tokio::main]
160 /// async fn main() {
161 /// Runtime::builder::<YellowstoneGrpcSource>()
162 /// .account(Pipeline::new(TokenProgramAccParser, [MyHandler]))
163 /// .account(Pipeline::new(TokenExtensionProgramAccParser, [MyHandler]))
164 /// .instruction(Pipeline::new(TokenExtensionProgramIxParser, [MyHandler]))
165 /// .instruction(Pipeline::new(TokenProgramIxParser, [MyHandler]))
166 /// .build(config)
167 /// .run_async()
168 /// .await;
169 /// }
170 /// ```
171 #[inline]
172 pub async fn run_async(self) { util::handle_fatal(self.try_run_async().await); }
173
174 /// Error returning variant of [`Self::run_async`].
175 ///
176 /// # Errors
177 /// This function returns an error if the runtime crashes.
178 ///
179 /// # Panics
180 /// Only panics if the rustls crypto provider fails to install.
181 ///
182 /// # Shutdown Flows
183 ///
184 /// ```text
185 /// ┌─────────────────────────────────────────────────────────────────────┐
186 /// │ RUNTIME SELECT! │
187 /// │ │
188 /// │ Signal ─────────────────┐ │
189 /// │ (Ctrl+C, SIGTERM) │ │
190 /// │ ▼ │
191 /// │ ┌─────────────┐ ┌─────────────┐ │
192 /// │ │Signal wins │────▶│stop_buffer()│ │
193 /// │ │select! │ │drops rx │ │
194 /// │ └─────────────┘ └──────┬──────┘ │
195 /// │ │ │ │
196 /// │ ▼ ▼ │
197 /// │ Ok(()) exit Source sees send │
198 /// │ fail, but select! │
199 /// │ already done │
200 /// │ │
201 /// ├─────────────────────────────────────────────────────────────────────┤
202 /// │ │
203 /// │ Buffer ─────────────────┐ │
204 /// │ (rx recv error/close) │ │
205 /// │ ▼ │
206 /// │ ┌─────────────┐ │
207 /// │ │Buffer wins │────▶ Err(YellowstoneStatus) │
208 /// │ │select! │ or Ok(StopCode) │
209 /// │ └─────────────┘ │
210 /// │ │
211 /// ├─────────────────────────────────────────────────────────────────────┤
212 /// │ │
213 /// │ SourceExit ─────────────┐ │
214 /// │ (source task ended) │ │
215 /// │ ▼ │
216 /// │ ┌─────────────┐ │
217 /// │ │SourceExit │ │
218 /// │ │wins select! │ │
219 /// │ └──────┬──────┘ │
220 /// │ │ │
221 /// │ ┌──────────────┬───┴──------───┬──────────────┐ │
222 /// │ ▼ ▼ ▼ ▼ │
223 /// │ Completed StreamEnded StreamError Error │
224 /// │ (finite src) (unexpected) (gRPC) (other) │
225 /// │ │ │ │ │ │
226 /// │ ▼ ▼ ▼ ▼ │
227 /// │ Ok(()) ServerHangup ServerHangup Other │
228 /// │ │
229 /// │ ┌──────────────────────────────────────────────────────────┐ │
230 /// │ │ ReceiverDropped: defensive only - normally unreachable │ │
231 /// │ │ because Signal/Buffer branch wins first when rx drops │ │
232 /// │ └──────────────────────────────────────────────────────────┘ │
233 /// └─────────────────────────────────────────────────────────────────────┘
234 /// ```
235 #[tracing::instrument("Runtime::run", skip(self))]
236 #[allow(clippy::too_many_lines)]
237 pub async fn try_run_async(self) -> Result<(), Box<Error>> {
238 enum StopType<S> {
239 Signal(S),
240 Buffer(Result<(), Error>),
241 SourceExit(Result<SourceExitStatus, oneshot::error::RecvError>),
242 }
243
244 let (tx, updates_rx) =
245 mpsc::channel::<Result<SubscribeUpdate, Status>>(self.buffer.sources_channel_size);
246
247 let (status_tx, status_rx) = oneshot::channel::<SourceExitStatus>();
248
249 #[cfg(feature = "prometheus")]
250 metrics::register_metrics(&self.metrics_registry);
251
252 let filters = self.pipelines.filters();
253
254 let source = S::new(self.source, filters);
255
256 tokio::spawn(async move {
257 let _ = source.connect(tx, status_tx).await;
258 });
259
260 let signal;
261
262 #[cfg(unix)]
263 {
264 use futures_util::stream::{FuturesUnordered, StreamExt};
265 use tokio::signal::unix::SignalKind;
266
267 let mut stream = [
268 SignalKind::hangup(),
269 SignalKind::interrupt(),
270 SignalKind::quit(),
271 SignalKind::terminate(),
272 ]
273 .into_iter()
274 .map(|k| {
275 tokio::signal::unix::signal(k).map(|mut s| async move {
276 s.recv().await;
277 Ok(k)
278 })
279 })
280 .collect::<Result<FuturesUnordered<_>, _>>()
281 .map_err(|e| Box::new(e.into()))?;
282
283 signal = async move { stream.next().await.transpose() }
284 }
285
286 #[cfg(not(unix))]
287 {
288 use std::fmt;
289
290 use futures_util::TryFutureExt;
291
292 struct CtrlC;
293
294 impl fmt::Debug for CtrlC {
295 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.write_str("^C") }
296 }
297
298 signal = tokio::signal::ctrl_c()
299 .map_ok(|()| Some(CtrlC))
300 .map_err(Into::into);
301 }
302
303 let mut buffer = buffer::Buffer::run_yellowstone(self.buffer, updates_rx, self.pipelines);
304
305 let stop_ty = tokio::select! {
306 s = signal => StopType::Signal(s),
307 b = buffer.wait_for_stop() => StopType::Buffer(b),
308 status = status_rx => StopType::SourceExit(status),
309 };
310
311 let should_stop_buffer = !matches!(stop_ty, StopType::Buffer(..));
312
313 match stop_ty {
314 StopType::Signal(Ok(Some(s))) => {
315 tracing::warn!("{s:?} received, shutting down...");
316 Ok(())
317 },
318 StopType::Signal(Ok(None)) => Err(std::io::Error::new(
319 std::io::ErrorKind::BrokenPipe,
320 "Signal handler returned None",
321 )
322 .into()),
323 StopType::Buffer(result) => result,
324 StopType::Signal(Err(e)) => Err(e),
325 StopType::SourceExit(Ok(status)) => match status {
326 SourceExitStatus::ReceiverDropped => {
327 tracing::info!("Source stopped: receiver dropped (shutdown)");
328 Ok(())
329 },
330 SourceExitStatus::Completed => {
331 tracing::info!("Source completed successfully");
332 Ok(())
333 },
334 SourceExitStatus::StreamEnded => {
335 tracing::warn!("Source stopped: stream ended unexpectedly");
336 Err(Error::ServerHangup)
337 },
338 SourceExitStatus::StreamError { code, message } => {
339 tracing::error!(?code, %message, "Source stopped: stream error");
340 Err(Error::YellowstoneStatus(Status::new(code, message)))
341 },
342 SourceExitStatus::Error(msg) => {
343 tracing::error!(%msg, "Source stopped: error");
344 Err(Error::Other(msg.into()))
345 },
346 },
347 StopType::SourceExit(Err(_)) => {
348 tracing::warn!("Source exit status channel closed unexpectedly");
349 Err(Error::ClientHangup)
350 },
351 }?;
352
353 if should_stop_buffer {
354 Self::stop_buffer(buffer).await;
355 }
356
357 Ok(())
358 }
359
360 async fn stop_buffer(buffer: buffer::Buffer) {
361 match buffer.join().await {
362 Err(e) => tracing::warn!(err = %Chain(&e), "Error stopping runtime buffer"),
363 Ok(c) => c.as_unit(),
364 }
365 }
366}
367
368#[cfg(test)]
369mod runtime_tests;