use std::sync::LazyLock;
use brk_error::{Error, Result};
use brk_traversable::TreeNode;
use brk_types::{
BlockHashPrefix, CacheClass, Date, DetailedSeriesCount, Epoch, Format, Halving, Height, Index,
IndexInfo, LegacyValue, Limit, Output, OutputLegacy, PaginatedSeries, Pagination, RangeIndex,
RangeMap, SearchQuery, SeriesData, SeriesInfo, SeriesName, SeriesOutput, SeriesOutputLegacy,
SeriesSelection, Timestamp, Version,
};
use parking_lot::RwLock;
use vecdb::{AnyExportableVec, ReadableVec};
use crate::Query;
static HEIGHT_BY_MONOTONIC_TIMESTAMP: LazyLock<RwLock<RangeMap<Timestamp, Height>>> =
LazyLock::new(|| RwLock::new(RangeMap::default()));
const CSV_HEADER_BYTES_PER_COL: usize = 10;
const CSV_CELL_BYTES: usize = 15;
const JSON_CELL_BYTES: usize = 12;
impl Query {
pub fn search_series(&self, query: &SearchQuery) -> Vec<&'static str> {
self.vecs().matches(&query.q, query.limit)
}
pub fn series_not_found_error(&self, series: &SeriesName) -> Error {
if let Some(indexes) = self.vecs().series_to_indexes(series) {
let supported = indexes
.iter()
.map(|i| format!("/api/series/{series}/{}", i.name()))
.collect::<Vec<_>>()
.join(", ");
return Error::SeriesUnsupportedIndex {
series: brk_error::truncate_series_name(series.to_string()),
supported,
};
}
let matches = self.vecs().matches(series, Limit::DEFAULT);
let total_matches = matches.len();
let suggestions = matches.into_iter().take(3).collect();
Error::SeriesNotFound(brk_error::SeriesNotFound::new(
series.to_string(),
suggestions,
total_matches,
))
}
pub(crate) fn columns_to_csv(
columns: &[&dyn AnyExportableVec],
start: usize,
end: usize,
) -> Result<String> {
if columns.is_empty() {
return Ok(String::new());
}
let num_cols = columns.len();
let mut csv = String::with_capacity(num_cols * CSV_HEADER_BYTES_PER_COL);
for (i, col) in columns.iter().enumerate() {
if i > 0 {
csv.push(',');
}
csv.push_str(col.name());
}
csv.push('\n');
if num_cols == 1 {
columns[0].write_csv_column(Some(start), Some(end), &mut csv)?;
return Ok(csv);
}
let from = Some(start as i64);
let to = Some(end as i64);
let num_rows = columns[0].range_count(from, to);
csv.reserve(num_rows * num_cols * CSV_CELL_BYTES);
let mut writers: Vec<_> = columns
.iter()
.map(|col| col.create_writer(from, to))
.collect();
for _ in 0..num_rows {
for (i, writer) in writers.iter_mut().enumerate() {
if i > 0 {
csv.push(',');
}
writer.write_next(&mut csv)?;
}
csv.push('\n');
}
Ok(csv)
}
fn get_vec(&self, series: &SeriesName, index: Index) -> Result<&'static dyn AnyExportableVec> {
self.vecs()
.get(series, index)
.ok_or_else(|| self.series_not_found_error(series))
}
pub fn latest(&self, series: &SeriesName, index: Index) -> Result<serde_json::Value> {
self.get_vec(series, index)?
.last_json_value()
.ok_or(Error::NoData)
}
pub fn len(&self, series: &SeriesName, index: Index) -> Result<usize> {
Ok(self.get_vec(series, index)?.len())
}
pub fn version(&self, series: &SeriesName, index: Index) -> Result<Version> {
Ok(self.get_vec(series, index)?.version())
}
pub fn search(&self, params: &SeriesSelection) -> Result<Vec<&'static dyn AnyExportableVec>> {
if params.series.is_empty() {
return Err(Error::NoSeries);
}
params
.series
.iter()
.map(|s| self.get_vec(s, params.index))
.collect()
}
pub fn weight(vecs: &[&dyn AnyExportableVec], from: Option<i64>, to: Option<i64>) -> usize {
vecs.iter().map(|v| v.range_weight(from, to)).sum()
}
pub fn resolve(&self, params: SeriesSelection, max_weight: usize) -> Result<ResolvedQuery> {
let vecs = self.search(¶ms)?;
let total = vecs.iter().map(|v| v.len()).min().unwrap_or(0);
let version: Version = vecs.iter().map(|v| v.version()).sum();
let index = params.index;
let resolve_bound = |ri: RangeIndex, fallback: usize| -> Result<usize> {
let i = self.range_index_to_i64(ri, index)?;
Ok(vecs
.iter()
.map(|v| v.i64_to_usize(i))
.min()
.unwrap_or(fallback))
};
let start = match params.start() {
Some(ri) => resolve_bound(ri, 0)?,
None => 0,
};
let end = match params.end() {
Some(ri) => resolve_bound(ri, total)?,
None => params
.limit()
.map(|l| start.saturating_add(*l).min(total))
.unwrap_or(total),
};
let end = end.max(start);
let weight = Self::weight(&vecs, Some(start as i64), Some(end as i64));
if weight > max_weight {
return Err(Error::WeightExceeded {
requested: weight,
max: max_weight,
});
}
let tip_height = self.height();
let hash_prefix = self.tip_hash_prefix();
let stable_count = self.stable_count(params.index, total, tip_height);
Ok(ResolvedQuery {
vecs,
format: params.format(),
index: params.index,
version,
total,
start,
end,
hash_prefix,
stable_count,
})
}
pub fn stable_count(&self, index: Index, total: usize, tip_height: Height) -> Option<usize> {
match index.cache_class() {
CacheClass::Bucket { margin } => Some(total.saturating_sub(margin)),
CacheClass::Entity => {
let h = Height::from((*tip_height).saturating_sub(6));
Some(self.entity_index_at(index, h).unwrap_or(0).min(total))
}
CacheClass::Mutable => None,
}
}
fn entity_index_at(&self, index: Index, h: Height) -> Option<usize> {
let v = &self.indexer().vecs;
match index {
Index::TxIndex => v
.transactions
.first_tx_index
.collect_one(h)
.map(usize::from),
Index::TxInIndex => v.inputs.first_txin_index.collect_one(h).map(usize::from),
Index::TxOutIndex => v.outputs.first_txout_index.collect_one(h).map(usize::from),
Index::EmptyOutputIndex => v.scripts.empty.first_index.collect_one(h).map(usize::from),
Index::OpReturnIndex => v
.scripts
.op_return
.first_index
.collect_one(h)
.map(usize::from),
Index::P2MSOutputIndex => v.scripts.p2ms.first_index.collect_one(h).map(usize::from),
Index::UnknownOutputIndex => v
.scripts
.unknown
.first_index
.collect_one(h)
.map(usize::from),
Index::P2AAddrIndex => v.addrs.p2a.first_index.collect_one(h).map(usize::from),
Index::P2PK33AddrIndex => v.addrs.p2pk33.first_index.collect_one(h).map(usize::from),
Index::P2PK65AddrIndex => v.addrs.p2pk65.first_index.collect_one(h).map(usize::from),
Index::P2PKHAddrIndex => v.addrs.p2pkh.first_index.collect_one(h).map(usize::from),
Index::P2SHAddrIndex => v.addrs.p2sh.first_index.collect_one(h).map(usize::from),
Index::P2TRAddrIndex => v.addrs.p2tr.first_index.collect_one(h).map(usize::from),
Index::P2WPKHAddrIndex => v.addrs.p2wpkh.first_index.collect_one(h).map(usize::from),
Index::P2WSHAddrIndex => v.addrs.p2wsh.first_index.collect_one(h).map(usize::from),
_ => unreachable!("entity_index_at called for non-Entity Index: {index:?}"),
}
}
pub fn format(&self, resolved: ResolvedQuery) -> Result<SeriesOutput> {
let ResolvedQuery {
vecs,
format,
index,
version,
total,
start,
end,
..
} = resolved;
let output = match format {
Format::CSV => Output::CSV(Self::columns_to_csv(&vecs, start, end)?),
Format::JSON => {
let count = end.saturating_sub(start);
Output::Json(Self::write_json_array(&vecs, count, 256, |v, buf| {
SeriesData::serialize(v, index, start, end, buf)
})?)
}
};
Ok(SeriesOutput {
output,
version,
total,
start,
end,
})
}
pub fn format_raw(&self, resolved: ResolvedQuery) -> Result<SeriesOutput> {
if resolved.format == Format::CSV {
return self.format(resolved);
}
let ResolvedQuery {
vecs,
version,
total,
start,
end,
..
} = resolved;
let count = end.saturating_sub(start);
let buf = Self::write_json_array(&vecs, count, 2, |v, buf| {
v.write_json(Some(start), Some(end), buf)
})?;
Ok(SeriesOutput {
output: Output::Json(buf),
version,
total,
start,
end,
})
}
fn write_json_array(
vecs: &[&dyn AnyExportableVec],
cell_count: usize,
wrapper_overhead: usize,
mut write_one: impl FnMut(&dyn AnyExportableVec, &mut Vec<u8>) -> vecdb::Result<()>,
) -> Result<Vec<u8>> {
let multi = vecs.len() > 1;
let mut buf =
Vec::with_capacity(cell_count * JSON_CELL_BYTES * vecs.len() + wrapper_overhead);
if multi {
buf.push(b'[');
}
for (i, vec) in vecs.iter().enumerate() {
if i > 0 {
buf.push(b',');
}
write_one(*vec, &mut buf)?;
}
if multi {
buf.push(b']');
}
Ok(buf)
}
pub fn series_count(&self) -> DetailedSeriesCount {
DetailedSeriesCount {
total: self.vecs().counts.clone(),
by_db: self.vecs().counts_by_db.clone(),
}
}
pub fn indexes(&self) -> &'static [IndexInfo] {
&self.vecs().indexes
}
pub fn series_list(&self, pagination: Pagination) -> PaginatedSeries {
self.vecs().series(pagination)
}
pub fn series_catalog(&self) -> &'static TreeNode {
self.vecs().catalog()
}
pub fn series_info(&self, series: &SeriesName) -> Option<SeriesInfo> {
self.vecs().series_info(series)
}
fn range_index_to_i64(&self, ri: RangeIndex, index: Index) -> Result<i64> {
match ri {
RangeIndex::Int(i) => Ok(i),
RangeIndex::Date(date) => self.date_to_i64(date, index),
RangeIndex::Timestamp(ts) => self.timestamp_to_i64(ts, index),
}
}
fn date_to_i64(&self, date: Date, index: Index) -> Result<i64> {
if let Some(idx) = index.date_to_index(date) {
return Ok(idx as i64);
}
self.timestamp_to_i64(Timestamp::from(date), index)
}
fn timestamp_to_i64(&self, ts: Timestamp, index: Index) -> Result<i64> {
if let Some(idx) = index.timestamp_to_index(ts) {
return Ok(idx as i64);
}
let height = Height::from(self.height_for_timestamp(ts));
match index {
Index::Height => Ok(usize::from(height) as i64),
Index::Epoch => Ok(usize::from(Epoch::from(height)) as i64),
Index::Halving => Ok(usize::from(Halving::from(height)) as i64),
_ => Err(Error::Parse(format!(
"date/timestamp ranges not supported for index '{index}'"
))),
}
}
fn height_for_timestamp(&self, ts: Timestamp) -> usize {
let current_height: usize = self.height().into();
let lookup = |map: &RangeMap<Timestamp, Height>| {
map.ceil(ts).map(usize::from).unwrap_or(current_height)
};
{
let map = HEIGHT_BY_MONOTONIC_TIMESTAMP.read();
if map.len() > current_height {
return lookup(&map);
}
}
let mut map = HEIGHT_BY_MONOTONIC_TIMESTAMP.write();
if map.len() <= current_height {
*map = RangeMap::from(self.computer().indexes.timestamp.monotonic.collect());
}
lookup(&map)
}
pub fn format_legacy(&self, resolved: ResolvedQuery) -> Result<SeriesOutputLegacy> {
let ResolvedQuery {
vecs,
format,
version,
total,
start,
end,
..
} = resolved;
if vecs.is_empty() {
return Ok(SeriesOutputLegacy {
output: OutputLegacy::default(format),
version: Version::ZERO,
total: 0,
start: 0,
end: 0,
});
}
let from = Some(start as i64);
let to = Some(end as i64);
let output = match format {
Format::CSV => OutputLegacy::CSV(Self::columns_to_csv(&vecs, start, end)?),
Format::JSON => {
if vecs.len() == 1 {
let col = vecs[0];
let count = col.range_count(from, to);
let mut buf = Vec::new();
if count == 1 {
col.write_json_value(Some(start), &mut buf)?;
OutputLegacy::Json(LegacyValue::Value(buf))
} else {
col.write_json(Some(start), Some(end), &mut buf)?;
OutputLegacy::Json(LegacyValue::List(buf))
}
} else {
let mut values = Vec::with_capacity(vecs.len());
for vec in &vecs {
let mut buf = Vec::new();
vec.write_json(Some(start), Some(end), &mut buf)?;
values.push(buf);
}
OutputLegacy::Json(LegacyValue::Matrix(values))
}
}
};
Ok(SeriesOutputLegacy {
output,
version,
total,
start,
end,
})
}
}
pub struct ResolvedQuery {
pub vecs: Vec<&'static dyn AnyExportableVec>,
pub format: Format,
pub index: Index,
pub version: Version,
pub total: usize,
pub start: usize,
pub end: usize,
pub hash_prefix: BlockHashPrefix,
pub stable_count: Option<usize>,
}
impl ResolvedQuery {
pub fn csv_filename(&self) -> String {
let names: Vec<_> = self.vecs.iter().map(|v| v.name()).collect();
format!("{}-{}.csv", names.join("_"), self.index)
}
}