use crate::Resampling;
use crate::{Attribution, Borsa, MergeStrategy, Span};
use borsa_core::{BorsaConnector, BorsaError, HistoryRequest, HistoryResponse};
type IndexedConnector = (usize, std::sync::Arc<dyn BorsaConnector>);
enum ResamplePlan {
Minutes(i64),
Daily,
Weekly,
}
type HistoryTaskResult = (
usize,
&'static str,
Result<HistoryResponse, BorsaError>,
Option<ResamplePlan>,
);
type HistoryOk = (usize, &'static str, HistoryResponse);
type CollectedHistory = (Vec<HistoryOk>, Vec<BorsaError>);
fn choose_effective_interval(
supported: &[borsa_core::Interval],
requested: borsa_core::Interval,
) -> Result<(borsa_core::Interval, Option<ResamplePlan>), BorsaError> {
use borsa_core::Interval;
if supported.contains(&requested) {
return Ok((requested, None));
}
if let Some(req_min) = requested.minutes() {
let mut best_divisor: Option<(Interval, i64)> = None; for &s in supported {
if let Some(m) = s.minutes()
&& m <= req_min
&& req_min % m == 0
&& best_divisor.as_ref().is_none_or(|&(_, bm)| m > bm)
{
best_divisor = Some((s, m));
}
}
if let Some((eff, _)) = best_divisor {
return Ok((eff, Some(ResamplePlan::Minutes(req_min))));
}
return Err(BorsaError::unsupported(
"history interval (intraday too fine for provider)",
));
}
match requested {
Interval::D1 => {
if supported.contains(&Interval::D1) {
Ok((Interval::D1, None))
} else {
let coarsest_intraday = supported
.iter()
.filter_map(|&iv| iv.minutes().map(|m| (iv, m)))
.max_by_key(|&(_, m)| m);
if let Some((eff, _)) = coarsest_intraday {
Ok((eff, Some(ResamplePlan::Daily)))
} else {
Err(BorsaError::unsupported(
"history interval (daily requires daily or intraday)",
))
}
}
}
Interval::W1 => {
if supported.contains(&Interval::W1) {
Ok((Interval::W1, None))
} else if supported.contains(&Interval::D1) {
Ok((Interval::D1, Some(ResamplePlan::Weekly)))
} else {
let coarsest_intraday = supported
.iter()
.filter_map(|&iv| iv.minutes().map(|m| (iv, m)))
.max_by_key(|&(_, m)| m);
if let Some((eff, _)) = coarsest_intraday {
Ok((eff, Some(ResamplePlan::Weekly)))
} else {
Err(BorsaError::unsupported(
"history interval (weekly requires weekly/daily/intraday)",
))
}
}
}
_ => Ok((requested, None)),
}
}
impl Borsa {
async fn fetch_joined_history(
&self,
eligible: &[(usize, std::sync::Arc<dyn BorsaConnector>)],
inst: &borsa_core::Instrument,
req_copy: HistoryRequest,
) -> Result<Vec<HistoryTaskResult>, BorsaError> {
let make_future = || async {
match self.cfg.merge_history_strategy {
MergeStrategy::Deep => Ok(Self::parallel_history(
eligible,
inst,
&req_copy,
self.cfg.provider_timeout,
)
.await),
MergeStrategy::Fallback => Ok(Self::sequential_history(
eligible.to_vec(),
inst,
req_copy,
self.cfg.provider_timeout,
)
.await),
_ => Err(BorsaError::InvalidArg(
"unknown merge strategy (upgrade borsa to support this variant)".into(),
)),
}
};
(crate::core::with_request_deadline(self.cfg.request_timeout, make_future()).await)
.unwrap_or_else(|_| Err(BorsaError::request_timeout("history")))
}
fn finalize_history_results(
&self,
joined: Vec<HistoryTaskResult>,
symbol: &str,
) -> Result<(HistoryResponse, Attribution), BorsaError> {
let attempts = joined.len();
let (mut results_ord, errors) = Self::collect_successes(joined);
if results_ord.is_empty() {
if errors.is_empty() {
return Err(BorsaError::not_found(format!("history for {symbol}")));
}
if errors.len() == attempts
&& errors
.iter()
.all(|e| matches!(e, BorsaError::ProviderTimeout { .. }))
{
return Err(BorsaError::AllProvidersTimedOut {
capability: "history".to_string(),
});
}
return Err(BorsaError::AllProvidersFailed(errors));
}
self.order_results(&mut results_ord);
let filtered_ord: Vec<HistoryOk> = self.filter_adjustedness(results_ord);
let results: Vec<(&'static str, HistoryResponse)> =
filtered_ord.into_iter().map(|(_, n, hr)| (n, hr)).collect();
let attr = Self::build_attribution(&results, symbol);
let mut merged = Self::merge_history_or_tag_connector_error(&results)?;
self.apply_final_resample(&mut merged)?;
Ok((merged, attr))
}
fn filter_adjustedness(&self, results_ord: Vec<HistoryOk>) -> Vec<HistoryOk> {
if results_ord.is_empty() {
return Vec::new();
}
if self.cfg.prefer_adjusted_history && results_ord.iter().any(|(_, _, hr)| hr.adjusted) {
return results_ord
.into_iter()
.filter(|(_, _, hr)| hr.adjusted)
.collect();
}
let target_adjusted = results_ord.first().is_some_and(|(_, _, hr)| hr.adjusted);
results_ord
.into_iter()
.filter(|(_, _, hr)| hr.adjusted == target_adjusted)
.collect()
}
fn merge_history_or_tag_connector_error(
results: &[(&'static str, HistoryResponse)],
) -> Result<HistoryResponse, BorsaError> {
if results.len() == 1 {
return Ok(results.first().unwrap().1.clone());
}
match borsa_core::timeseries::merge::merge_history(results.iter().cloned().map(|(_, r)| r))
{
Ok(mut m) => {
borsa_core::timeseries::util::strip_unadjusted(&mut m.candles);
Ok(m)
}
Err(borsa_core::BorsaError::Data(msg))
if msg == "Connector provided mixed-currency history" =>
{
Err(Self::identify_faulty_provider(results))
}
Err(e) => Err(e),
}
}
fn identify_faulty_provider(
results: &[(&'static str, HistoryResponse)],
) -> borsa_core::BorsaError {
use std::collections::HashMap;
enum CurrencyState {
NoData,
Consistent(borsa_core::Currency),
Inconsistent,
}
let mut per_provider_currency: HashMap<&'static str, CurrencyState> = HashMap::new();
for (name, hr) in results {
let mut cur: Option<borsa_core::Currency> = None;
let mut state = CurrencyState::NoData;
for c in &hr.candles {
if borsa_core::timeseries::util::ensure_candle_currency_uniform(c).is_err() {
state = CurrencyState::Inconsistent;
break;
}
let oc = c.open.currency().clone();
if cur.as_ref().is_some_and(|prev| prev != &oc) {
state = CurrencyState::Inconsistent;
break;
}
cur.get_or_insert(oc);
}
if !matches!(state, CurrencyState::Inconsistent) {
state = cur.map_or(CurrencyState::NoData, CurrencyState::Consistent);
}
per_provider_currency.insert(*name, state);
}
if let Some((bad_name, _)) = per_provider_currency
.iter()
.find(|(_, v)| matches!(v, CurrencyState::Inconsistent))
{
return borsa_core::BorsaError::Connector {
connector: (*bad_name).to_string(),
msg: "Provider returned inconsistent currency data".to_string(),
};
}
let mut counts: HashMap<borsa_core::Currency, usize> = HashMap::new();
for v in per_provider_currency.values() {
if let CurrencyState::Consistent(cur) = v {
*counts.entry(cur.clone()).or_insert(0) += 1;
}
}
let majority = counts.into_iter().max_by_key(|(_, c)| *c).map(|(k, _)| k);
if let Some(maj) = majority
&& let Some(bad_name) = per_provider_currency.iter().find_map(|(n, v)| match v {
CurrencyState::Consistent(cur) if cur != &maj => Some(*n),
_ => None,
})
{
return borsa_core::BorsaError::Connector {
connector: bad_name.to_string(),
msg: "Provider returned inconsistent currency data".to_string(),
};
}
let fallback = results.last().map_or("unknown", |(n, _)| *n);
borsa_core::BorsaError::Connector {
connector: fallback.to_string(),
msg: "Provider returned inconsistent currency data".to_string(),
}
}
pub async fn history(
&self,
inst: &borsa_core::Instrument,
req: HistoryRequest,
) -> Result<HistoryResponse, BorsaError> {
let (merged, _attr) = self.history_with_attribution(inst, req).await?;
Ok(merged)
}
pub async fn history_with_attribution(
&self,
inst: &borsa_core::Instrument,
req: HistoryRequest,
) -> Result<(HistoryResponse, Attribution), BorsaError> {
let eligible = self.eligible_history_connectors(inst)?;
let req_copy = req;
let joined = self.fetch_joined_history(&eligible, inst, req_copy).await?;
self.finalize_history_results(joined, inst.symbol_str())
}
}
impl Borsa {
fn eligible_history_connectors(
&self,
inst: &borsa_core::Instrument,
) -> Result<Vec<IndexedConnector>, BorsaError> {
let ordered = self.ordered(inst);
let mut eligible: Vec<(usize, std::sync::Arc<dyn BorsaConnector>)> = Vec::new();
for (idx, c) in ordered.into_iter().enumerate() {
if c.supports_kind(*inst.kind()) && c.as_history_provider().is_some() {
eligible.push((idx, c));
}
}
if eligible.is_empty() {
return Err(BorsaError::unsupported("history"));
}
Ok(eligible)
}
async fn parallel_history(
eligible: &[(usize, std::sync::Arc<dyn BorsaConnector>)],
inst: &borsa_core::Instrument,
req_copy: &HistoryRequest,
provider_timeout: std::time::Duration,
) -> Vec<HistoryTaskResult> {
let tasks = eligible.iter().map(|(idx, c)| {
Self::spawn_history_task(*idx, c.clone(), inst.clone(), req_copy, provider_timeout)
});
futures::future::join_all(tasks).await
}
fn build_effective_request(
c: &std::sync::Arc<dyn BorsaConnector>,
kind: borsa_core::AssetKind,
req_copy: &HistoryRequest,
) -> Result<(HistoryRequest, Option<ResamplePlan>), BorsaError> {
let supported = c
.as_history_provider()
.expect("checked is_some above")
.supported_history_intervals(kind)
.to_vec();
let (effective_interval, resample_plan) =
choose_effective_interval(&supported, req_copy.interval())?;
let mut b = borsa_core::HistoryRequestBuilder::default();
if let Some(r) = req_copy.range() {
b = b.range(r);
} else if let Some((s, e)) = req_copy.period() {
b = b.period(s, e);
}
b = b.interval(effective_interval);
b = b.include_prepost(req_copy.include_prepost());
b = b.include_actions(req_copy.include_actions());
b = b.auto_adjust(req_copy.auto_adjust());
b = b.keepna(req_copy.keepna());
let eff_req = b.build()?;
Ok((eff_req, resample_plan))
}
fn spawn_history_task(
idx: usize,
c: std::sync::Arc<dyn BorsaConnector>,
inst: borsa_core::Instrument,
req_copy: &HistoryRequest,
provider_timeout: std::time::Duration,
) -> impl std::future::Future<
Output = (
usize,
&'static str,
Result<HistoryResponse, BorsaError>,
Option<ResamplePlan>,
),
> {
let kind = *inst.kind();
async move {
let (eff_req, resample_target_min) =
match Self::build_effective_request(&c, kind, req_copy) {
Ok(v) => v,
Err(e) => return (idx, c.name(), Err(e), None),
};
let fut = c
.as_history_provider()
.expect("checked is_some above")
.history(&inst, eff_req);
let resp =
Self::provider_call_with_timeout(c.name(), "history", provider_timeout, fut).await;
(idx, c.name(), resp, resample_target_min)
}
}
async fn sequential_history(
eligible: Vec<IndexedConnector>,
inst: &borsa_core::Instrument,
req_copy: HistoryRequest,
provider_timeout: std::time::Duration,
) -> Vec<HistoryTaskResult> {
let mut results = Vec::new();
for (idx, c) in eligible {
let (eff_req, resample_target_min) =
match Self::build_effective_request(&c, *inst.kind(), &req_copy) {
Ok(v) => v,
Err(e) => {
let result = (idx, c.name(), Err(e), None);
results.push(result);
continue;
}
};
let fut = c
.as_history_provider()
.expect("checked is_some above")
.history(inst, eff_req);
let resp =
Self::provider_call_with_timeout(c.name(), "history", provider_timeout, fut).await;
let result = (idx, c.name(), resp, resample_target_min);
if let Ok(ref hr) = result.2
&& !hr.candles.is_empty()
{
results.push(result);
break;
}
results.push(result);
}
results
}
fn collect_successes(joined: Vec<HistoryTaskResult>) -> CollectedHistory {
let mut results_ord: Vec<HistoryOk> = Vec::new();
let mut errors: Vec<BorsaError> = Vec::new();
for (idx, name, res, resample_target_min) in joined {
match res {
Ok(mut hr) if !hr.candles.is_empty() => {
if let Some(plan) = resample_target_min {
match plan {
ResamplePlan::Minutes(mins) => {
match borsa_core::timeseries::resample::resample_to_minutes_with_meta(
std::mem::take(&mut hr.candles),
mins,
hr.meta.as_ref(),
) {
Ok(c) => hr.candles = c,
Err(e) => {
errors.push(crate::core::tag_err(name, e));
continue;
}
}
}
ResamplePlan::Daily => {
match borsa_core::timeseries::resample::resample_to_daily_with_meta(
std::mem::take(&mut hr.candles),
hr.meta.as_ref(),
) {
Ok(c) => hr.candles = c,
Err(e) => {
errors.push(crate::core::tag_err(name, e));
continue;
}
}
}
ResamplePlan::Weekly => {
match borsa_core::timeseries::resample::resample_to_weekly_with_meta(
std::mem::take(&mut hr.candles),
hr.meta.as_ref(),
) {
Ok(c) => hr.candles = c,
Err(e) => {
errors.push(crate::core::tag_err(name, e));
continue;
}
}
}
}
}
results_ord.push((idx, name, hr));
}
Ok(_) | Err(borsa_core::BorsaError::NotFound { .. }) => {}
Err(e) => errors.push(crate::core::tag_err(name, e)),
}
}
(results_ord, errors)
}
fn order_results(&self, results_ord: &mut Vec<HistoryOk>) {
if self.cfg.prefer_adjusted_history {
results_ord.sort_by_key(|(idx, _, hr)| (!hr.adjusted, *idx));
} else {
results_ord.sort_by_key(|(idx, _, _)| *idx);
}
}
fn build_attribution(results: &[(&'static str, HistoryResponse)], symbol: &str) -> Attribution {
use std::collections::BTreeMap;
let mut ts_to_provider: BTreeMap<chrono::DateTime<chrono::Utc>, &'static str> =
BTreeMap::new();
for (name, hr) in results {
for c in &hr.candles {
ts_to_provider.entry(c.ts).or_insert(*name);
}
}
let mut attr = Attribution::new(symbol.to_string());
let mut iter = ts_to_provider.into_iter();
if let Some((first_ts, mut current_provider)) = iter.next() {
let mut run_start = first_ts.timestamp();
let mut last_ts = first_ts.timestamp();
for (ts, provider) in iter {
let ts_sec = ts.timestamp();
if provider == current_provider {
} else {
attr.push((
current_provider,
Span {
start: run_start,
end: last_ts,
},
));
current_provider = provider;
run_start = ts_sec;
}
last_ts = ts_sec;
}
attr.push((
current_provider,
Span {
start: run_start,
end: last_ts,
},
));
}
attr
}
fn apply_final_resample(&self, merged: &mut HistoryResponse) -> Result<(), BorsaError> {
let will_resample = if !matches!(self.cfg.resampling, Resampling::None) {
true
} else if self.cfg.auto_resample_subdaily_to_daily {
borsa_core::timeseries::infer::is_subdaily(&merged.candles)
} else {
false
};
if matches!(self.cfg.resampling, Resampling::Weekly) {
let new_candles = borsa_core::timeseries::resample::resample_to_weekly_with_meta(
std::mem::take(&mut merged.candles),
merged.meta.as_ref(),
)?;
merged.candles = new_candles;
} else if matches!(self.cfg.resampling, Resampling::Daily)
|| (self.cfg.auto_resample_subdaily_to_daily
&& borsa_core::timeseries::infer::is_subdaily(&merged.candles))
{
let new_candles = borsa_core::timeseries::resample::resample_to_daily_with_meta(
std::mem::take(&mut merged.candles),
merged.meta.as_ref(),
)?;
merged.candles = new_candles;
}
if will_resample {
borsa_core::timeseries::util::strip_unadjusted(&mut merged.candles);
}
Ok(())
}
}