1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![doc = include_str!("../README.md")]
3#![doc(html_favicon_url = "https://zino.cc/assets/zino-logo.png")]
4#![doc(html_logo_url = "https://zino.cc/assets/zino-logo.svg")]
5#![allow(async_fn_in_trait)]
6
7use serde::de::DeserializeOwned;
8use toml::Table;
9use zino_core::{
10 AvroValue, LazyLock, Map, Record, application::StaticRecord, error::Error,
11 extension::TomlTableExt, state::State,
12};
13
14mod data_source;
15
16pub use data_source::DataSource;
17use data_source::DataSourceConnector;
18
19#[cfg(feature = "connector-arrow")]
21mod arrow;
22#[cfg(feature = "connector-http")]
23mod http;
24#[cfg(feature = "connector-mysql")]
25mod mysql;
26#[cfg(feature = "connector-postgres")]
27mod postgres;
28#[cfg(feature = "connector-sqlite")]
29mod sqlite;
30#[cfg(any(
31 feature = "connector-mysql",
32 feature = "connector-postgres",
33 feature = "connector-sqlite"
34))]
35mod sqlx_row;
36
37#[cfg(feature = "connector-http")]
38mod helper;
39
40#[cfg(feature = "connector-arrow")]
41pub use arrow::{ArrowConnector, DataFrameExecutor};
42#[cfg(feature = "connector-http")]
43pub use http::HttpConnector;
44
45pub trait Connector {
47 fn try_new_data_source(config: &Table) -> Result<DataSource, Error>;
50
51 async fn execute(&self, query: &str, params: Option<&Map>) -> Result<Option<u64>, Error>;
53
54 async fn query(&self, query: &str, params: Option<&Map>) -> Result<Vec<Record>, Error>;
56
57 async fn query_as<T: DeserializeOwned>(
59 &self,
60 query: &str,
61 params: Option<&Map>,
62 ) -> Result<Vec<T>, Error> {
63 let data = self.query(query, params).await?;
64 let value = data.into_iter().map(AvroValue::Record).collect();
65 apache_avro::from_value(&AvroValue::Array(value)).map_err(|err| err.into())
66 }
67
68 async fn query_one(&self, query: &str, params: Option<&Map>) -> Result<Option<Record>, Error>;
70
71 async fn query_one_as<T: DeserializeOwned>(
73 &self,
74 query: &str,
75 params: Option<&Map>,
76 ) -> Result<Option<T>, Error> {
77 if let Some(record) = self.query_one(query, params).await? {
78 let value = AvroValue::Union(1, Box::new(AvroValue::Record(record)));
79 apache_avro::from_value(&value).map_err(|err| err.into())
80 } else {
81 Ok(None)
82 }
83 }
84}
85
86#[derive(Debug, Clone, Copy, Default)]
88pub struct GlobalConnector;
89
90impl GlobalConnector {
91 #[inline]
93 pub fn get(name: &str) -> Option<&'static DataSource> {
94 SHARED_DATA_SOURCE_CONNECTORS.find(name)
95 }
96}
97
98static SHARED_DATA_SOURCE_CONNECTORS: LazyLock<StaticRecord<DataSource>> = LazyLock::new(|| {
100 let mut data_sources = StaticRecord::new();
101 if let Some(connectors) = State::shared().config().get_array("connector") {
102 for connector in connectors.iter().filter_map(|v| v.as_table()) {
103 let data_source_type = connector.get_str("type").unwrap_or("unkown");
104 let name = connector.get_str("name").unwrap_or(data_source_type);
105 let data_source = DataSource::try_new_data_source(connector)
106 .unwrap_or_else(|err| panic!("fail to connect data source `{name}`: {err}"));
107 data_sources.add(name, data_source);
108 }
109 }
110 data_sources
111});