zino_connector/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![doc(html_favicon_url = "https://zino.cc/assets/zino-logo.png")]
4#![doc(html_logo_url = "https://zino.cc/assets/zino-logo.svg")]
5#![allow(async_fn_in_trait)]
6
7use serde::de::DeserializeOwned;
8use toml::Table;
9use zino_core::{
10    AvroValue, LazyLock, Map, Record, application::StaticRecord, error::Error,
11    extension::TomlTableExt, state::State,
12};
13
14mod data_source;
15
16pub use data_source::DataSource;
17use data_source::DataSourceConnector;
18
19/// Supported connectors.
20#[cfg(feature = "connector-arrow")]
21mod arrow;
22#[cfg(feature = "connector-http")]
23mod http;
24#[cfg(feature = "connector-mysql")]
25mod mysql;
26#[cfg(feature = "connector-postgres")]
27mod postgres;
28#[cfg(feature = "connector-sqlite")]
29mod sqlite;
30#[cfg(any(
31    feature = "connector-mysql",
32    feature = "connector-postgres",
33    feature = "connector-sqlite"
34))]
35mod sqlx_row;
36
37#[cfg(feature = "connector-http")]
38mod helper;
39
40#[cfg(feature = "connector-arrow")]
41pub use arrow::{ArrowConnector, DataFrameExecutor};
42#[cfg(feature = "connector-http")]
43pub use http::HttpConnector;
44
45/// Underlying trait of all data sources for implementors.
46pub trait Connector {
47    /// Constructs a new data source with the configuration,
48    /// returning an error if it fails.
49    fn try_new_data_source(config: &Table) -> Result<DataSource, Error>;
50
51    /// Executes the query and returns the total number of rows affected.
52    async fn execute(&self, query: &str, params: Option<&Map>) -> Result<Option<u64>, Error>;
53
54    /// Executes the query and parses it as `Vec<Record>`.
55    async fn query(&self, query: &str, params: Option<&Map>) -> Result<Vec<Record>, Error>;
56
57    /// Executes the query and parses it as `Vec<T>`.
58    async fn query_as<T: DeserializeOwned>(
59        &self,
60        query: &str,
61        params: Option<&Map>,
62    ) -> Result<Vec<T>, Error> {
63        let data = self.query(query, params).await?;
64        let value = data.into_iter().map(AvroValue::Record).collect();
65        apache_avro::from_value(&AvroValue::Array(value)).map_err(|err| err.into())
66    }
67
68    /// Executes the query and parses it as a `Record`.
69    async fn query_one(&self, query: &str, params: Option<&Map>) -> Result<Option<Record>, Error>;
70
71    /// Executes the query and parses it as an instance of type `T`.
72    async fn query_one_as<T: DeserializeOwned>(
73        &self,
74        query: &str,
75        params: Option<&Map>,
76    ) -> Result<Option<T>, Error> {
77        if let Some(record) = self.query_one(query, params).await? {
78            let value = AvroValue::Union(1, Box::new(AvroValue::Record(record)));
79            apache_avro::from_value(&value).map_err(|err| err.into())
80        } else {
81            Ok(None)
82        }
83    }
84}
85
86/// Global access to the shared data source connectors.
87#[derive(Debug, Clone, Copy, Default)]
88pub struct GlobalConnector;
89
90impl GlobalConnector {
91    /// Gets the data source for the specific service.
92    #[inline]
93    pub fn get(name: &str) -> Option<&'static DataSource> {
94        SHARED_DATA_SOURCE_CONNECTORS.find(name)
95    }
96}
97
98/// Shared connectors.
99static SHARED_DATA_SOURCE_CONNECTORS: LazyLock<StaticRecord<DataSource>> = LazyLock::new(|| {
100    let mut data_sources = StaticRecord::new();
101    if let Some(connectors) = State::shared().config().get_array("connector") {
102        for connector in connectors.iter().filter_map(|v| v.as_table()) {
103            let data_source_type = connector.get_str("type").unwrap_or("unkown");
104            let name = connector.get_str("name").unwrap_or(data_source_type);
105            let data_source = DataSource::try_new_data_source(connector)
106                .unwrap_or_else(|err| panic!("fail to connect data source `{name}`: {err}"));
107            data_sources.add(name, data_source);
108        }
109    }
110    data_sources
111});