1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
use crate::Borsa;
use borsa_core::{
BorsaError, DownloadEntry, DownloadReport, DownloadResponse, HistoryRequest, Instrument,
};
use std::collections::HashSet;
// Validate that all instruments have unique symbols.
fn validate_unique_symbols(insts: &[Instrument]) -> Result<(), BorsaError> {
let mut seen = HashSet::new();
for inst in insts {
let symbol = inst.symbol().to_string();
if !seen.insert(symbol.clone()) {
return Err(BorsaError::InvalidArg(format!(
"duplicate symbol '{symbol}' in instruments list"
)));
}
}
Ok(())
}
/// Builder to orchestrate bulk history downloads for multiple symbols.
pub struct DownloadBuilder<'a> {
pub(crate) borsa: &'a Borsa,
pub(crate) instruments: Vec<Instrument>,
// Defer building a validated HistoryRequest until run(), to avoid panics on input.
pub(crate) range: Option<borsa_core::Range>,
pub(crate) period: Option<(i64, i64)>,
pub(crate) interval: borsa_core::Interval,
}
impl<'a> DownloadBuilder<'a> {
/// Create a new builder bound to a `Borsa` instance.
///
/// Behavior:
/// - Starts with an empty instrument list.
/// - Defers validation of range/period/interval until `run()`.
#[must_use]
pub const fn new(borsa: &'a Borsa) -> Self {
Self {
borsa,
instruments: Vec::new(),
range: Some(borsa_core::Range::M6),
period: None,
interval: borsa_core::Interval::D1,
}
}
/// Replace the instruments list.
///
/// Trade-offs: Replaces any previously added instruments; use `add_instrument`
/// if you need to append.
///
/// # Errors
/// Returns an error if duplicate symbols are detected in the provided instruments.
pub fn instruments(mut self, insts: &[Instrument]) -> Result<Self, BorsaError> {
validate_unique_symbols(insts)?;
self.instruments = insts.to_vec();
Ok(self)
}
/// Add a single instrument to the list.
///
/// # Errors
/// Returns an error if the instrument's symbol already exists in the list.
///
/// # Panics
/// Panics only if an internal invariant is broken whereby the just-pushed
/// instrument is missing; this cannot occur in normal use.
pub fn add_instrument(mut self, inst: Instrument) -> Result<Self, BorsaError> {
let mut combined = self.instruments.clone();
combined.push(inst);
if validate_unique_symbols(&combined).is_err() {
return Err(BorsaError::InvalidArg(format!(
"duplicate symbol '{}' already exists in instruments list",
combined.last().expect("pushed instrument exists").symbol()
)));
}
self.instruments = combined;
Ok(self)
}
/// Set a logical lookback range and clear any explicit period.
///
/// Behavior: Mutually exclusive with `period`; setting this clears an existing
/// explicit [start, end).
#[must_use]
pub const fn range(mut self, range: borsa_core::Range) -> Self {
self.range = Some(range);
self.period = None;
self
}
/// Set an explicit period [start, end) and clear any logical range.
///
/// Behavior: Mutually exclusive with `range`; setting this clears an existing
/// logical range.
#[must_use]
pub const fn period(mut self, start: i64, end: i64) -> Self {
self.period = Some((start, end));
self.range = None;
self
}
/// Select the desired history interval.
#[must_use]
pub const fn interval(mut self, interval: borsa_core::Interval) -> Self {
self.interval = interval;
self
}
/// Execute the download across eligible providers and aggregate results.
///
/// Behavior and trade-offs:
/// - Validates the request and then concurrently fetches per-symbol history using
/// the same merge/resample rules as `Borsa::history_with_attribution`.
/// - Populates the returned [`DownloadReport`] with a [`borsa_core::DownloadResponse`]
/// containing per-symbol candles, actions, and metadata keyed by symbol when at
/// least one instrument succeeds.
/// - Partial failures populate the `warnings` vector with `{symbol}: {error}` entries
/// without aborting the entire batch.
/// # Errors
/// Returns an error only if no instruments are specified or if an overall
/// request-level timeout elapses.
pub async fn run(self) -> Result<DownloadReport, BorsaError> {
if self.instruments.is_empty() {
return Err(BorsaError::InvalidArg(
"no instruments specified for download".into(),
));
}
// Defensive check for duplicates (should not happen if using the builder correctly)
validate_unique_symbols(&self.instruments)?;
// Build a validated HistoryRequest now; convert timestamp seconds safely.
let req: HistoryRequest = if let Some((start, end)) = self.period {
let start_dt = chrono::DateTime::from_timestamp(start, 0).ok_or_else(|| {
BorsaError::InvalidArg(format!("invalid start timestamp: {start}"))
})?;
let end_dt = chrono::DateTime::from_timestamp(end, 0)
.ok_or_else(|| BorsaError::InvalidArg(format!("invalid end timestamp: {end}")))?;
HistoryRequest::try_from_period(start_dt, end_dt, self.interval)?
} else {
let range = self.range.unwrap_or(borsa_core::Range::M6);
HistoryRequest::try_from_range(range, self.interval)?
};
let tasks = self.instruments.iter().map(|inst| {
let borsa = self.borsa;
let req = req.clone();
let inst = inst.clone();
async move {
match borsa.history_with_attribution(&inst, req).await {
Ok((hr, _attr)) => (inst, Ok(hr)),
Err(e) => (inst, Err(e)),
}
}
});
// Apply optional request-level deadline across the fan-out
let joined: Vec<(Instrument, Result<borsa_core::HistoryResponse, BorsaError>)> =
match crate::core::with_request_deadline(
self.borsa.cfg.request_timeout,
futures::future::join_all(tasks),
)
.await
{
Ok(v) => v,
Err(_) => return Err(BorsaError::request_timeout("download:history")),
};
let mut entries: Vec<DownloadEntry> = Vec::new();
let mut had_success = false;
let mut warnings: Vec<BorsaError> = Vec::new();
for (instrument, result) in joined {
match result {
Ok(resp) => {
had_success = true;
entries.push(DownloadEntry {
instrument,
history: resp,
});
}
Err(e) => {
// Preserve the original error, which is already connector-tagged upstream.
warnings.push(e);
}
}
}
let response = if had_success {
Some(DownloadResponse { entries })
} else {
None
};
Ok(DownloadReport { response, warnings })
}
}
impl Borsa {
/// Begin building a bulk download request.
///
/// Typical usage: chain `instruments`/`range`/`interval` then call `run()`.
#[must_use]
pub const fn download(&'_ self) -> DownloadBuilder<'_> {
DownloadBuilder::new(self)
}
}