use std::sync::Arc;
use actix_web::{HttpRequest, HttpResponse, get, web};
use crate::backend::Backend;
pub mod v1;
pub type BackendData = web::Data<Arc<dyn Backend>>;#[derive(Debug, Clone, Copy)]
pub struct QueryLimits {
pub max_page_size: u64,
}
impl Default for QueryLimits {
fn default() -> Self {
Self {
max_page_size: 100_000,
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct SqlSettings {
pub enabled: bool,
pub max_rows: u64,
}
impl Default for SqlSettings {
fn default() -> Self {
Self {
enabled: false,
max_rows: 100_000,
}
}
}
pub const ARROW_IPC_MIME: &str = "application/vnd.apache.arrow.stream";#[get("/health")]
pub async fn health() -> HttpResponse {
HttpResponse::Ok()
.content_type("application/json")
.body(r#"{"status":"ok"}"#)
}
#[get("/healthz")]
pub async fn healthz() -> HttpResponse {
HttpResponse::Ok()
.content_type("application/json")
.body(r#"{"status":"ok"}"#)
}
#[get("/readyz")]
pub async fn readyz(backend: BackendData) -> HttpResponse {
let names = backend.names();
if names.is_empty() {
HttpResponse::ServiceUnavailable()
.content_type("application/json")
.body(r#"{"status":"not ready","reason":"no datasets registered"}"#)
} else {
let body = format!(r#"{{"status":"ready","datasets":{}}}"#, names.len());
HttpResponse::Ok()
.content_type("application/json")
.body(body)
}
}
#[derive(Clone, Debug, serde::Serialize)]
pub struct BuildInfo {
pub name: &'static str,
pub version: &'static str,
pub backend: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
pub git_sha: Option<&'static str>,
#[serde(skip_serializing_if = "Option::is_none")]
pub build_time: Option<&'static str>,
pub profile: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
pub target: Option<&'static str>,
}
impl BuildInfo {
pub fn new(backend: &'static str) -> Self {
Self {
name: env!("CARGO_PKG_NAME"),
version: env!("CARGO_PKG_VERSION"),
backend,
git_sha: option_env!("DATAPRESS_GIT_SHA"),
build_time: option_env!("DATAPRESS_BUILD_TIME"),
profile: if cfg!(debug_assertions) {
"debug"
} else {
"release"
},
target: option_env!("DATAPRESS_TARGET"),
}
}
}
#[get("/version")]
pub async fn version(info: web::Data<BuildInfo>) -> HttpResponse {
HttpResponse::Ok().json(info.get_ref())
}
pub(crate) fn wants_arrow(http: &HttpRequest) -> bool {
let qs = http.query_string();
if !qs.is_empty()
&& qs.split('&').any(|kv| matches!(kv.split_once('='), Some(("format", v)) if v.eq_ignore_ascii_case("arrow")))
{
return true;
}
http.headers()
.get(actix_web::http::header::ACCEPT)
.and_then(|h| h.to_str().ok())
.map(|s| {
s.split(',').any(|part| {
part.split(';')
.next()
.unwrap_or("")
.trim()
.eq_ignore_ascii_case(ARROW_IPC_MIME)
})
})
.unwrap_or(false)
}
pub const PARQUET_MIME: &str = "application/vnd.apache.parquet";
#[derive(Default)]
pub struct ParquetCache {
inner: std::sync::RwLock<std::collections::HashMap<String, Arc<bytes::Bytes>>>,
}
impl ParquetCache {
pub fn get(&self, name: &str) -> Option<Arc<bytes::Bytes>> {
self.inner.read().ok()?.get(name).cloned()
}
pub fn insert(&self, name: &str, bytes: bytes::Bytes) -> Arc<bytes::Bytes> {
let shared = Arc::new(bytes);
if let Ok(mut map) = self.inner.write() {
map.insert(name.to_string(), shared.clone());
}
shared
}
pub fn invalidate(&self, name: &str) {
if let Ok(mut map) = self.inner.write() {
map.remove(name);
}
}
}
struct ByteRange {
start: u64,
end: u64,
}
fn parse_byte_range(header: &str, total: u64) -> Result<Option<ByteRange>, ()> {
let spec = match header.trim().strip_prefix("bytes=") {
Some(s) => s.trim(),
None => return Ok(None),
};
let first = spec.split(',').next().unwrap_or("").trim();
let (start_s, end_s) = match first.split_once('-') {
Some(parts) => parts,
None => return Ok(None),
};
if total == 0 {
return Err(());
}
let (start, end) = if start_s.is_empty() {
let n: u64 = end_s.trim().parse().map_err(|_| ())?;
if n == 0 {
return Err(());
}
let n = n.min(total);
(total - n, total - 1)
} else {
let start: u64 = start_s.trim().parse().map_err(|_| ())?;
let end: u64 = if end_s.trim().is_empty() {
total - 1
} else {
end_s.trim().parse::<u64>().map_err(|_| ())?.min(total - 1)
};
(start, end)
};
if start > end || start >= total {
return Err(());
}
Ok(Some(ByteRange { start, end }))
}
pub fn serve_bytes_with_range(
http: &HttpRequest,
body: Arc<bytes::Bytes>,
content_type: &str,
) -> HttpResponse {
use actix_web::http::header;
let total = body.len() as u64;
let range = http
.headers()
.get(header::RANGE)
.and_then(|h| h.to_str().ok());
match range.map(|r| parse_byte_range(r, total)) {
Some(Err(())) => HttpResponse::RangeNotSatisfiable()
.insert_header((header::CONTENT_RANGE, format!("bytes */{total}")))
.insert_header((header::ACCEPT_RANGES, "bytes"))
.finish(),
Some(Ok(Some(ByteRange { start, end }))) => HttpResponse::PartialContent()
.insert_header((header::CONTENT_TYPE, content_type.to_string()))
.insert_header((header::ACCEPT_RANGES, "bytes"))
.insert_header((header::CONTENT_RANGE, format!("bytes {start}-{end}/{total}")))
.body(body.slice(start as usize..(end as usize + 1))),
_ => HttpResponse::Ok()
.insert_header((header::CONTENT_TYPE, content_type.to_string()))
.insert_header((header::ACCEPT_RANGES, "bytes"))
.body(bytes::Bytes::clone(&body)),
}
}