Skip to main content

yellowstone_vixen/
lib.rs

1#![deny(
2    clippy::disallowed_methods,
3    clippy::suspicious,
4    clippy::style,
5    clippy::clone_on_ref_ptr,
6    missing_debug_implementations,
7    missing_copy_implementations
8)]
9#![warn(clippy::pedantic, missing_docs)]
10#![allow(clippy::module_name_repetitions)]
11
12//! Vixen provides a simple API for requesting, parsing, and consuming data
13//! from Yellowstone.
14
15use std::marker::PhantomData;
16
17use config::BufferConfig;
18use tokio::sync::{mpsc, oneshot};
19use yellowstone_grpc_proto::tonic::Status;
20
21use crate::sources::SourceExitStatus;
22
23#[cfg(feature = "prometheus")]
24pub extern crate prometheus;
25#[cfg(feature = "prometheus")]
26pub mod metrics;
27pub extern crate thiserror;
28pub extern crate yellowstone_vixen_core as vixen_core;
29pub use vixen_core::bs58;
30
31mod buffer;
32pub mod builder;
33pub mod config;
34pub mod handler;
35pub mod instruction;
36
37pub mod sources;
38
39/// Utility functions for the Vixen runtime.
40pub mod util;
41
42pub mod filter_pipeline;
43
44pub use handler::{Handler, HandlerResult, Pipeline};
45pub use util::*;
46use yellowstone_grpc_proto::geyser::SubscribeUpdate;
47pub use yellowstone_vixen_core::CommitmentLevel;
48
49use crate::{builder::RuntimeBuilder, sources::SourceTrait};
50
51/// An error thrown by the Vixen runtime.
52#[derive(Debug, thiserror::Error)]
53pub enum Error {
54    /// A system I/O error.
55    #[error("I/O error")]
56    Io(#[from] std::io::Error),
57    /// An error returned by a Yellowstone server.
58    #[error("Yellowstone client builder error")]
59    YellowstoneBuilder(#[from] yellowstone_grpc_client::GeyserGrpcBuilderError),
60    /// An error returned by a Yellowstone client.
61    #[error("Yellowstone client error")]
62    YellowstoneClient(#[from] yellowstone_grpc_client::GeyserGrpcClientError),
63    /// An error occurring when the Yellowstone client stops early.
64    #[error("Yellowstone client crashed")]
65    ClientHangup,
66    /// An error occurring when the Yellowstone server closes the connection.
67    #[error("Yellowstone stream hung up unexpectedly")]
68    ServerHangup,
69    /// A gRPC error returned by the Yellowstone server.
70    #[error("Yellowstone stream returned an error")]
71    YellowstoneStatus(#[from] yellowstone_grpc_proto::tonic::Status),
72    /// An error occurring when a datasource is not configured correctly.
73    #[error("Yellowstone stream config error")]
74    ConfigError,
75    /// An error occurring when a runtime error occurs.
76    #[error("Other error")]
77    Other(#[from] Box<dyn std::error::Error + Send + Sync>),
78}
79
80/// The main runtime for Vixen.
81#[derive(Debug)]
82pub struct Runtime<S: SourceTrait> {
83    buffer: BufferConfig,
84    source: S::Config,
85    pipelines: handler::PipelineSets,
86    #[cfg(feature = "prometheus")]
87    metrics_registry: prometheus::Registry,
88    _source: PhantomData<S>,
89}
90
91impl<S: SourceTrait> Runtime<S> {
92    /// Create a new runtime builder.
93    pub fn builder() -> RuntimeBuilder<S> { RuntimeBuilder::<S>::default() }
94}
95impl<S: SourceTrait> Runtime<S> {
96    /// Create a new Tokio runtime and run the Vixen runtime within it,
97    /// terminating the current process if the runtime crashes.
98    ///
99    /// For error handling, use the recoverable variant [`Self::try_run`].
100    ///
101    /// If you want to provide your own tokio Runtime because you need to run
102    /// async code outside of the Vixen runtime, use the [`Self::run_async`]
103    /// method.
104    ///
105    /// # Example
106    ///
107    /// ```ignore
108    /// use yellowstone_vixen::Pipeline;
109    /// use yellowstone_vixen_spl_token_parser::{AccountParser, InstructionParser};
110    ///
111    /// // MyHandler is a handler that implements the Handler trait
112    /// // NOTE: The main function is not async
113    /// fn main() {
114    ///     Runtime::builder::<YellowstoneGrpcSource>()
115    ///         .account(Pipeline::new(AccountParser, [MyHandler]))
116    ///         .instruction(Pipeline::new(InstructionParser, [MyHandler]))
117    ///         .build(config)
118    ///         .run(); // Process will exit if an error occurs
119    /// }
120    /// ```
121    #[inline]
122    pub fn run(self) { util::handle_fatal(self.try_run()); }
123
124    /// Error returning variant of [`Self::run`].
125    ///
126    /// # Errors
127    /// This function returns an error if the runtime crashes.
128    #[inline]
129    pub fn try_run(self) -> Result<(), Box<Error>> {
130        tokio::runtime::Runtime::new()
131            .map_err(|e| Box::new(e.into()))?
132            .block_on(self.try_run_async())
133    }
134
135    /// Run the Vixen runtime asynchronously, terminating the current process
136    /// if the runtime crashes.
137    ///
138    /// For error handling, use the recoverable variant [`Self::try_run_async`].
139    ///
140    /// If you don't need to run any async code outside the Vixen runtime, you
141    /// can use the [`Self::run`] method instead, which takes care of creating
142    /// a tokio Runtime for you.
143    ///
144    /// # Example
145    ///
146    /// ```ignore
147    /// use yellowstone_vixen_parser::{
148    ///     token_extension_program::{
149    ///         AccountParser as TokenExtensionProgramAccParser,
150    ///         InstructionParser as TokenExtensionProgramIxParser,
151    ///     },
152    ///     token_program::{
153    ///         AccountParser as TokenProgramAccParser, InstructionParser as TokenProgramIxParser,
154    ///     },
155    /// };
156    ///
157    /// // MyHandler is a handler that implements the Handler trait
158    ///
159    /// #[tokio::main]
160    /// async fn main() {
161    ///     Runtime::builder::<YellowstoneGrpcSource>()
162    ///         .account(Pipeline::new(TokenProgramAccParser, [MyHandler]))
163    ///         .account(Pipeline::new(TokenExtensionProgramAccParser, [MyHandler]))
164    ///         .instruction(Pipeline::new(TokenExtensionProgramIxParser, [MyHandler]))
165    ///         .instruction(Pipeline::new(TokenProgramIxParser, [MyHandler]))
166    ///         .build(config)
167    ///         .run_async()
168    ///         .await;
169    /// }
170    /// ```
171    #[inline]
172    pub async fn run_async(self) { util::handle_fatal(self.try_run_async().await); }
173
174    /// Error returning variant of [`Self::run_async`].
175    ///
176    /// # Errors
177    /// This function returns an error if the runtime crashes.
178    ///
179    /// # Panics
180    /// Only panics if the rustls crypto provider fails to install.
181    ///
182    /// # Shutdown Flows
183    ///
184    /// ```text
185    /// ┌─────────────────────────────────────────────────────────────────────┐
186    /// │                         RUNTIME SELECT!                             │
187    /// │                                                                     │
188    /// │   Signal ─────────────────┐                                         │
189    /// │   (Ctrl+C, SIGTERM)       │                                         │
190    /// │                           ▼                                         │
191    /// │                    ┌─────────────┐     ┌─────────────┐              │
192    /// │                    │Signal wins  │────▶│stop_buffer()│              │
193    /// │                    │select!      │     │drops rx     │              │
194    /// │                    └─────────────┘     └──────┬──────┘              │
195    /// │                           │                   │                     │
196    /// │                           ▼                   ▼                     │
197    /// │                      Ok(()) exit      Source sees send              │
198    /// │                                       fail, but select!             │
199    /// │                                       already done                  │
200    /// │                                                                     │
201    /// ├─────────────────────────────────────────────────────────────────────┤
202    /// │                                                                     │
203    /// │   Buffer ─────────────────┐                                         │
204    /// │   (rx recv error/close)   │                                         │
205    /// │                           ▼                                         │
206    /// │                    ┌─────────────┐                                  │
207    /// │                    │Buffer wins  │────▶ Err(YellowstoneStatus)      │
208    /// │                    │select!      │      or Ok(StopCode)             │
209    /// │                    └─────────────┘                                  │
210    /// │                                                                     │
211    /// ├─────────────────────────────────────────────────────────────────────┤
212    /// │                                                                     │
213    /// │   SourceExit ─────────────┐                                         │
214    /// │   (source task ended)     │                                         │
215    /// │                           ▼                                         │
216    /// │                    ┌─────────────┐                                  │
217    /// │                    │SourceExit   │                                  │
218    /// │                    │wins select! │                                  │
219    /// │                    └──────┬──────┘                                  │
220    /// │                           │                                         │
221    /// │        ┌──────────────┬───┴──------───┬──────────────┐              │
222    /// │        ▼              ▼               ▼              ▼              │
223    /// │   Completed      StreamEnded      StreamError     Error             │
224    /// │   (finite src)  (unexpected)        (gRPC)        (other)           │
225    /// │        │              │               │              │              │
226    /// │        ▼              ▼               ▼              ▼              │
227    /// │      Ok(())      ServerHangup     ServerHangup     Other            │
228    /// │                                                                     │
229    /// │    ┌──────────────────────────────────────────────────────────┐     │
230    /// │    │ ReceiverDropped: defensive only - normally unreachable   │     │
231    /// │    │ because Signal/Buffer branch wins first when rx drops    │     │
232    /// │    └──────────────────────────────────────────────────────────┘     │
233    /// └─────────────────────────────────────────────────────────────────────┘
234    /// ```
235    #[tracing::instrument("Runtime::run", skip(self))]
236    #[allow(clippy::too_many_lines)]
237    pub async fn try_run_async(self) -> Result<(), Box<Error>> {
238        enum StopType<S> {
239            Signal(S),
240            Buffer(Result<(), Error>),
241            SourceExit(Result<SourceExitStatus, oneshot::error::RecvError>),
242        }
243
244        let (tx, updates_rx) =
245            mpsc::channel::<Result<SubscribeUpdate, Status>>(self.buffer.sources_channel_size);
246
247        let (status_tx, status_rx) = oneshot::channel::<SourceExitStatus>();
248
249        #[cfg(feature = "prometheus")]
250        metrics::register_metrics(&self.metrics_registry);
251
252        let filters = self.pipelines.filters();
253
254        let source = S::new(self.source, filters);
255
256        tokio::spawn(async move {
257            let _ = source.connect(tx, status_tx).await;
258        });
259
260        let signal;
261
262        #[cfg(unix)]
263        {
264            use futures_util::stream::{FuturesUnordered, StreamExt};
265            use tokio::signal::unix::SignalKind;
266
267            let mut stream = [
268                SignalKind::hangup(),
269                SignalKind::interrupt(),
270                SignalKind::quit(),
271                SignalKind::terminate(),
272            ]
273            .into_iter()
274            .map(|k| {
275                tokio::signal::unix::signal(k).map(|mut s| async move {
276                    s.recv().await;
277                    Ok(k)
278                })
279            })
280            .collect::<Result<FuturesUnordered<_>, _>>()
281            .map_err(|e| Box::new(e.into()))?;
282
283            signal = async move { stream.next().await.transpose() }
284        }
285
286        #[cfg(not(unix))]
287        {
288            use std::fmt;
289
290            use futures_util::TryFutureExt;
291
292            struct CtrlC;
293
294            impl fmt::Debug for CtrlC {
295                fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.write_str("^C") }
296            }
297
298            signal = tokio::signal::ctrl_c()
299                .map_ok(|()| Some(CtrlC))
300                .map_err(Into::into);
301        }
302
303        let mut buffer = buffer::Buffer::run_yellowstone(self.buffer, updates_rx, self.pipelines);
304
305        let stop_ty = tokio::select! {
306            s = signal => StopType::Signal(s),
307            b = buffer.wait_for_stop() => StopType::Buffer(b),
308            status = status_rx => StopType::SourceExit(status),
309        };
310
311        let should_stop_buffer = !matches!(stop_ty, StopType::Buffer(..));
312
313        match stop_ty {
314            StopType::Signal(Ok(Some(s))) => {
315                tracing::warn!("{s:?} received, shutting down...");
316                Ok(())
317            },
318            StopType::Signal(Ok(None)) => Err(std::io::Error::new(
319                std::io::ErrorKind::BrokenPipe,
320                "Signal handler returned None",
321            )
322            .into()),
323            StopType::Buffer(result) => result,
324            StopType::Signal(Err(e)) => Err(e),
325            StopType::SourceExit(Ok(status)) => match status {
326                SourceExitStatus::ReceiverDropped => {
327                    tracing::info!("Source stopped: receiver dropped (shutdown)");
328                    Ok(())
329                },
330                SourceExitStatus::Completed => {
331                    tracing::info!("Source completed successfully");
332                    Ok(())
333                },
334                SourceExitStatus::StreamEnded => {
335                    tracing::warn!("Source stopped: stream ended unexpectedly");
336                    Err(Error::ServerHangup)
337                },
338                SourceExitStatus::StreamError { code, message } => {
339                    tracing::error!(?code, %message, "Source stopped: stream error");
340                    Err(Error::YellowstoneStatus(Status::new(code, message)))
341                },
342                SourceExitStatus::Error(msg) => {
343                    tracing::error!(%msg, "Source stopped: error");
344                    Err(Error::Other(msg.into()))
345                },
346            },
347            StopType::SourceExit(Err(_)) => {
348                tracing::warn!("Source exit status channel closed unexpectedly");
349                Err(Error::ClientHangup)
350            },
351        }?;
352
353        if should_stop_buffer {
354            Self::stop_buffer(buffer).await;
355        }
356
357        Ok(())
358    }
359
360    async fn stop_buffer(buffer: buffer::Buffer) {
361        match buffer.join().await {
362            Err(e) => tracing::warn!(err = %Chain(&e), "Error stopping runtime buffer"),
363            Ok(c) => c.as_unit(),
364        }
365    }
366}
367
368#[cfg(test)]
369mod runtime_tests;