1#![allow(dead_code)]
2
3use crate::{
4 bindings::*,
5 correlation_id::CorrelationId,
6 element::Element,
7 event::{Event, EventType},
8 name,
9 ref_data::RefData,
10 request::Request,
11 service::Service,
12 session_options::SessionOptions,
13 Error,
14};
15use std::collections::HashMap;
16use std::{ffi::CString, ptr};
17
18const MAX_PENDING_REQUEST: usize = 1024;
19const MAX_REFDATA_FIELDS: usize = 400;
20const MAX_HISTDATA_FIELDS: usize = 25;
21
22pub struct Session {
23 ptr: *mut blpapi_Session_t,
24 correlation_count: u64,
25}
26
27impl Session {
28 fn from_options(options: SessionOptions) -> Self {
29 let handler: Option<
30 unsafe extern "C" fn(*mut blpapi_Event, *mut blpapi_Session, *mut std::ffi::c_void),
31 > = None;
32 let dispatcher: *mut blpapi_EventDispatcher = ptr::null_mut();
33 let user_data: *mut std::ffi::c_void = ptr::null_mut();
34 let ptr: *mut blpapi_Session =
35 unsafe { blpapi_Session_create(options.0, handler, dispatcher, user_data) };
36
37 Session {
38 ptr,
39 correlation_count: 0,
40 }
41 }
42
43 pub fn start(&mut self) -> Result<(), Error> {
44 let res: i32 = unsafe { blpapi_Session_start(self.ptr) };
45
46 Error::check(res)
47 }
48
49 pub fn stop(&mut self) -> Result<(), Error> {
50 let res: i32 = unsafe { blpapi_Session_stop(self.ptr) };
51
52 Error::check(res)
53 }
54
55 pub fn open_service(&mut self, service: &str) -> Result<(), Error> {
56 let service: CString = CString::new(service).unwrap();
57 let res: i32 = unsafe { blpapi_Session_openService(self.ptr, service.as_ptr()) };
58
59 Error::check(res)
60 }
61
62 pub fn get_service(&self, service: &str) -> Result<Service, Error> {
63 let name: CString = CString::new(service).unwrap();
64
65 let mut service: *mut blpapi_Service = ptr::null_mut();
66
67 let res: i32 =
68 unsafe { blpapi_Session_getService(self.ptr, &mut service as *mut _, name.as_ptr()) };
69
70 Error::check(res)?;
71
72 Ok(Service(service))
73 }
74
75 pub fn send(
76 &mut self,
77 request: Request,
78 correlation_id: Option<CorrelationId>,
79 ) -> Result<CorrelationId, Error> {
80 let mut correlation_id: CorrelationId =
81 correlation_id.unwrap_or_else(|| self.new_correlation_id());
82 let identity: *mut blpapi_Identity = ptr::null_mut();
83 let event_queue: *mut blpapi_EventQueue = ptr::null_mut();
84 let request_label: *mut i8 = ptr::null_mut();
85 let request_label_len: i32 = 0;
86
87 unsafe {
88 let res = blpapi_Session_sendRequest(
89 self.ptr,
90 request.ptr,
91 &mut correlation_id.0 as *mut _,
92 identity,
93 event_queue,
94 request_label,
95 request_label_len,
96 );
97
98 Error::check(res)?;
99
100 Ok(correlation_id)
101 }
102 }
103
104 fn new_correlation_id(&mut self) -> CorrelationId {
105 let id = CorrelationId::new_u64(self.correlation_count);
106
107 self.correlation_count += 1;
108
109 id
110 }
111}
112
113impl Drop for Session {
114 fn drop(&mut self) {
115 unsafe { blpapi_Session_destroy(self.ptr) }
116 }
117}
118
119pub struct SessionSync(Session);
120
121impl SessionSync {
122 pub fn from_options(options: SessionOptions) -> Self {
123 SessionSync(Session::from_options(options))
124 }
125
126 pub fn new() -> Result<Self, Error> {
127 let mut session = Self::from_options(SessionOptions::default());
128
129 session.start()?;
130
131 session.open_service("//blp/refdata")?;
132
133 Ok(session)
134 }
135
136 pub fn send(
137 &mut self,
138 request: Request,
139 correlation_id: Option<CorrelationId>,
140 ) -> Result<Events, Error> {
141 let _id = (&mut *self as &mut Session).send(request, correlation_id)?;
142
143 Ok(Events::new(self))
144 }
145
146 pub fn next_event(&mut self, timeout_ms: Option<u32>) -> Result<Event, Error> {
147 let mut event: *mut blpapi_Event = ptr::null_mut();
148
149 let timeout: u32 = timeout_ms.unwrap_or(0);
150
151 unsafe {
152 let res: i32 = blpapi_Session_nextEvent(self.0.ptr, &mut event as *mut _, timeout);
153
154 Error::check(res)?;
155
156 Ok(Event(event))
157 }
158 }
159
160 pub fn ref_data<I, R>(&mut self, securities: I) -> Result<HashMap<String, R>, Error>
161 where
162 I: IntoIterator,
163 I::Item: AsRef<str>,
164 R: RefData,
165 {
166 let service = self.get_service("//blp/refdata")?;
167 let mut ref_data: HashMap<String, R> = HashMap::new();
168 let mut iter: <I as IntoIterator>::IntoIter = securities.into_iter();
169
170 for fields in R::FIELDS.chunks(MAX_REFDATA_FIELDS) {
171 loop {
172 let mut request: Request = service.create_request("ReferenceDataRequest")?;
173
174 let mut is_empty: bool = true;
175
176 for security in iter.by_ref().take(MAX_PENDING_REQUEST / fields.len()) {
177 request.append_named(&name::SECURITIES, security.as_ref())?;
178 is_empty = false;
179 }
180
181 if is_empty {
182 break;
183 }
184
185 for field in fields {
186 request.append_named(&name::FIELDS_NAME, *field)?;
187 }
188
189 for event in self.send(request, None)? {
190 for message in event?.messages().map(|m| m.element()) {
191 if let Some(securities) = message.get_named_element(&name::SECURITY_DATA) {
192 for security in securities.values::<Element>() {
193 let ticker: String = security
194 .get_named_element(&name::SECURITY_NAME)
195 .and_then(|s: Element| s.get_at(0))
196 .unwrap_or_else(String::new);
197
198 if let Some(error) =
199 security.get_named_element(&name::SECURITY_ERROR)
200 {
201 return Err(Error::security(ticker, error));
202 }
203
204 let entry: &mut R = ref_data.entry(ticker).or_default();
205
206 if let Some(fields) = security.get_named_element(&name::FIELD_DATA)
207 {
208 for field in fields.elements() {
209 entry.on_field(&field.string_name(), &field);
210 }
211 }
212 }
213 }
214 }
215 }
216 }
217 }
218
219 Ok(ref_data)
220 }
221
222 pub fn hist_data<I, R>(
223 &mut self,
224 securities: I,
225 options: HistOptions,
226 ) -> Result<HashMap<String, TimeSerie<R>>, Error>
227 where
228 I: IntoIterator,
229 I::Item: AsRef<str>,
230 R: RefData,
231 {
232 let service = self.get_service("//blp/refdata")?;
233
234 let mut ref_data: HashMap<String, TimeSerie<R>> = HashMap::new();
235 let mut iter = securities.into_iter();
236
237 for fields in R::FIELDS.chunks(MAX_HISTDATA_FIELDS) {
238 loop {
239 let mut request = service.create_request("HistoricalDataRequest")?;
240 let mut is_empty = true;
241
242 for security in iter.by_ref().take(MAX_PENDING_REQUEST / fields.len()) {
243 request.append_named(&name::SECURITIES, security.as_ref())?;
244 is_empty = false;
245 }
246
247 if is_empty {
248 break;
249 }
250
251 for field in fields {
252 request.append_named(&name::FIELDS_NAME, *field)?;
253 }
254
255 options.apply(&mut request)?;
256
257 for event in self.send(request, None)? {
258 for message in event?.messages().map(|m| m.element()) {
259 if let Some(security) = message.get_named_element(&name::SECURITY_DATA) {
260 let ticker: String = security
261 .get_named_element(&name::SECURITY_NAME)
262 .and_then(|s: Element| s.get_at(0))
263 .unwrap_or_else(|| String::new());
264
265 if security.has_named_element(&name::SECURITY_ERROR) {
266 break;
267 }
268
269 if let Some(fields) = security.get_named_element(&name::FIELD_DATA) {
270 let entry: &mut TimeSerie<R> =
271 ref_data.entry(ticker).or_insert_with(|| {
272 let len: usize = fields.num_values();
273
274 TimeSerie::<_>::with_capacity(len)
275 });
276
277 for points in fields.values::<Element>() {
278 let mut value = R::default();
279
280 for field in points.elements() {
281 let name = &field.string_name();
282
283 if name == "date" {
284 } else {
286 value.on_field(name, &field);
287 }
288 }
289
290 entry.values.push(value);
291 }
292 }
293 }
294 }
295 }
296 }
297 }
298
299 Ok(ref_data)
300 }
301}
302
303impl std::ops::Deref for SessionSync {
304 type Target = Session;
305 fn deref(&self) -> &Session {
306 &self.0
307 }
308}
309
310impl std::ops::DerefMut for SessionSync {
311 fn deref_mut(&mut self) -> &mut Session {
312 &mut self.0
313 }
314}
315
316pub struct Events<'a> {
317 session: &'a mut SessionSync,
318 exit: bool,
319}
320
321impl<'a> Events<'a> {
322 fn new(session: &'a mut SessionSync) -> Self {
323 Events {
324 session,
325 exit: false,
326 }
327 }
328
329 fn try_next(&mut self) -> Result<Option<Event>, Error> {
330 if self.exit {
331 return Ok(None);
332 }
333 loop {
334 let event: Event = self.session.next_event(None)?;
335 let event_type: EventType = event.event_type();
336
337 match event_type {
338 EventType::PartialResponse => return Ok(Some(event)),
339 EventType::Response => {
340 self.exit = true;
341 return Ok(Some(event));
342 }
343 EventType::SessionStatus => {
344 if event.messages().map(|m| m.message_type()).any(|m| {
345 m == *name::SESSION_TERMINATED || m == *name::SESSION_STARTUP_FAILURE
346 }) {
347 return Ok(None);
348 }
349 }
350
351 EventType::Timeout => return Err(Error::TimeOut),
352
353 _ => (),
354 }
355 }
356 }
357}
358
359impl<'a> Iterator for Events<'a> {
360 type Item = Result<Event, Error>;
361
362 fn next(&mut self) -> Option<Result<Event, Error>> {
363 self.try_next().transpose()
364 }
365}
366
367#[derive(Debug, Default)]
368pub struct HistOptions {
369 start_date: String,
370 end_date: String,
371 periodicity_adjustment: Option<PeriodicityAdjustment>,
372 periodicity_selection: Option<PeriodicitySelection>,
373 max_data_points: Option<i32>,
374 currency: Option<String>,
375}
376
377impl HistOptions {
378 pub fn new<S: Into<String>, E: Into<String>>(start_date: S, end_date: E) -> Self {
379 HistOptions {
380 start_date: start_date.into(),
381 end_date: end_date.into(),
382 ..HistOptions::default()
383 }
384 }
385
386 pub fn with_periodicity_adjustment(
387 mut self,
388 periodicity_adjustment: PeriodicityAdjustment,
389 ) -> Self {
390 self.periodicity_adjustment = Some(periodicity_adjustment);
391 self
392 }
393
394 pub fn with_periodicity_selection(
395 mut self,
396 periodicity_selection: PeriodicitySelection,
397 ) -> Self {
398 self.periodicity_selection = Some(periodicity_selection);
399 self
400 }
401
402 pub fn with_max_points(mut self, max_data_points: i32) -> Self {
403 self.max_data_points = Some(max_data_points);
404 self
405 }
406
407 pub fn with_currency(mut self, currency: String) -> Self {
408 self.currency = Some(currency);
409 self
410 }
411
412 fn apply(&self, request: &mut Request) -> Result<(), Error> {
413 let mut element = request.element();
414
415 element.set("startDate", &self.start_date[..])?;
416 element.set("endDate", &self.end_date[..])?;
417
418 if let Some(periodicity_selection) = self.periodicity_selection {
419 element.set("periodicitySelection", periodicity_selection.as_str())?;
420 }
421
422 if let Some(periodicity_adjustment) = self.periodicity_adjustment {
423 element.set("periodicityAdjustment", periodicity_adjustment.as_str())?;
424 }
425
426 if let Some(max_data_points) = self.max_data_points {
427 element.set("maxDataPoints", max_data_points)?;
428 }
429
430 if let Some(currency) = self.currency.as_ref() {
431 element.set("currency", &**currency)?;
432 }
433
434 Ok(())
435 }
436}
437
438#[derive(Default, Debug)]
439pub struct TimeSerie<R> {
440 pub values: Vec<R>,
441 dates: Vec<chrono::NaiveDate>,
442}
443
444impl<R> TimeSerie<R> {
445 pub fn with_capacity(capacity: usize) -> Self {
446 TimeSerie {
447 dates: Vec::with_capacity(capacity),
448 values: Vec::with_capacity(capacity),
449 }
450 }
451}
452
453#[derive(Debug, Clone, Copy)]
454pub enum PeriodicityAdjustment {
455 Actual,
456 Calendar,
457 Fiscal,
458}
459
460impl PeriodicityAdjustment {
461 pub fn as_str(self) -> &'static str {
462 match self {
463 PeriodicityAdjustment::Actual => "ACTUAL",
464 PeriodicityAdjustment::Calendar => "CALENDAR",
465 PeriodicityAdjustment::Fiscal => "FISCAL",
466 }
467 }
468}
469
470#[derive(Debug, Clone, Copy)]
471pub enum PeriodicitySelection {
472 Daily,
473 Weekly,
474 Monthly,
475 Quarterly,
476 SemiAnnually,
477 Yearly,
478}
479
480impl PeriodicitySelection {
481 pub fn as_str(self) -> &'static str {
482 match self {
483 PeriodicitySelection::Daily => "DAILY",
484 PeriodicitySelection::Weekly => "WEEKLY",
485 PeriodicitySelection::Monthly => "MONTHLY",
486 PeriodicitySelection::Quarterly => "QUARTERLY",
487 PeriodicitySelection::SemiAnnually => "SEMIANNUALLY",
488 PeriodicitySelection::Yearly => "YEARLY",
489 }
490 }
491}
492
493#[cfg(test)]
494mod tests {
495 use super::*;
496
497 #[test]
498 fn send_request() -> Result<(), Error> {
499 let mut _session: SessionSync = SessionOptions::default()
500 .with_server_host("localhost")?
501 .with_server_port(8194)?
502 .sync();
503
504 Ok(())
505 }
506}