bloomberg/
session.rs

1#![allow(dead_code)]
2
3use crate::{
4    bindings::*,
5    correlation_id::CorrelationId,
6    element::Element,
7    event::{Event, EventType},
8    name,
9    ref_data::RefData,
10    request::Request,
11    service::Service,
12    session_options::SessionOptions,
13    Error,
14};
15use std::collections::HashMap;
16use std::{ffi::CString, ptr};
17
18const MAX_PENDING_REQUEST: usize = 1024;
19const MAX_REFDATA_FIELDS: usize = 400;
20const MAX_HISTDATA_FIELDS: usize = 25;
21
22pub struct Session {
23    ptr: *mut blpapi_Session_t,
24    correlation_count: u64,
25}
26
27impl Session {
28    fn from_options(options: SessionOptions) -> Self {
29        let handler: Option<
30            unsafe extern "C" fn(*mut blpapi_Event, *mut blpapi_Session, *mut std::ffi::c_void),
31        > = None;
32        let dispatcher: *mut blpapi_EventDispatcher = ptr::null_mut();
33        let user_data: *mut std::ffi::c_void = ptr::null_mut();
34        let ptr: *mut blpapi_Session =
35            unsafe { blpapi_Session_create(options.0, handler, dispatcher, user_data) };
36
37        Session {
38            ptr,
39            correlation_count: 0,
40        }
41    }
42
43    pub fn start(&mut self) -> Result<(), Error> {
44        let res: i32 = unsafe { blpapi_Session_start(self.ptr) };
45
46        Error::check(res)
47    }
48
49    pub fn stop(&mut self) -> Result<(), Error> {
50        let res: i32 = unsafe { blpapi_Session_stop(self.ptr) };
51
52        Error::check(res)
53    }
54
55    pub fn open_service(&mut self, service: &str) -> Result<(), Error> {
56        let service: CString = CString::new(service).unwrap();
57        let res: i32 = unsafe { blpapi_Session_openService(self.ptr, service.as_ptr()) };
58
59        Error::check(res)
60    }
61
62    pub fn get_service(&self, service: &str) -> Result<Service, Error> {
63        let name: CString = CString::new(service).unwrap();
64
65        let mut service: *mut blpapi_Service = ptr::null_mut();
66
67        let res: i32 =
68            unsafe { blpapi_Session_getService(self.ptr, &mut service as *mut _, name.as_ptr()) };
69
70        Error::check(res)?;
71
72        Ok(Service(service))
73    }
74
75    pub fn send(
76        &mut self,
77        request: Request,
78        correlation_id: Option<CorrelationId>,
79    ) -> Result<CorrelationId, Error> {
80        let mut correlation_id: CorrelationId =
81            correlation_id.unwrap_or_else(|| self.new_correlation_id());
82        let identity: *mut blpapi_Identity = ptr::null_mut();
83        let event_queue: *mut blpapi_EventQueue = ptr::null_mut();
84        let request_label: *mut i8 = ptr::null_mut();
85        let request_label_len: i32 = 0;
86
87        unsafe {
88            let res = blpapi_Session_sendRequest(
89                self.ptr,
90                request.ptr,
91                &mut correlation_id.0 as *mut _,
92                identity,
93                event_queue,
94                request_label,
95                request_label_len,
96            );
97
98            Error::check(res)?;
99
100            Ok(correlation_id)
101        }
102    }
103
104    fn new_correlation_id(&mut self) -> CorrelationId {
105        let id = CorrelationId::new_u64(self.correlation_count);
106
107        self.correlation_count += 1;
108
109        id
110    }
111}
112
113impl Drop for Session {
114    fn drop(&mut self) {
115        unsafe { blpapi_Session_destroy(self.ptr) }
116    }
117}
118
119pub struct SessionSync(Session);
120
121impl SessionSync {
122    pub fn from_options(options: SessionOptions) -> Self {
123        SessionSync(Session::from_options(options))
124    }
125
126    pub fn new() -> Result<Self, Error> {
127        let mut session = Self::from_options(SessionOptions::default());
128
129        session.start()?;
130
131        session.open_service("//blp/refdata")?;
132
133        Ok(session)
134    }
135
136    pub fn send(
137        &mut self,
138        request: Request,
139        correlation_id: Option<CorrelationId>,
140    ) -> Result<Events, Error> {
141        let _id = (&mut *self as &mut Session).send(request, correlation_id)?;
142
143        Ok(Events::new(self))
144    }
145
146    pub fn next_event(&mut self, timeout_ms: Option<u32>) -> Result<Event, Error> {
147        let mut event: *mut blpapi_Event = ptr::null_mut();
148
149        let timeout: u32 = timeout_ms.unwrap_or(0);
150
151        unsafe {
152            let res: i32 = blpapi_Session_nextEvent(self.0.ptr, &mut event as *mut _, timeout);
153
154            Error::check(res)?;
155
156            Ok(Event(event))
157        }
158    }
159
160    pub fn ref_data<I, R>(&mut self, securities: I) -> Result<HashMap<String, R>, Error>
161    where
162        I: IntoIterator,
163        I::Item: AsRef<str>,
164        R: RefData,
165    {
166        let service = self.get_service("//blp/refdata")?;
167        let mut ref_data: HashMap<String, R> = HashMap::new();
168        let mut iter: <I as IntoIterator>::IntoIter = securities.into_iter();
169
170        for fields in R::FIELDS.chunks(MAX_REFDATA_FIELDS) {
171            loop {
172                let mut request: Request = service.create_request("ReferenceDataRequest")?;
173
174                let mut is_empty: bool = true;
175
176                for security in iter.by_ref().take(MAX_PENDING_REQUEST / fields.len()) {
177                    request.append_named(&name::SECURITIES, security.as_ref())?;
178                    is_empty = false;
179                }
180
181                if is_empty {
182                    break;
183                }
184
185                for field in fields {
186                    request.append_named(&name::FIELDS_NAME, *field)?;
187                }
188
189                for event in self.send(request, None)? {
190                    for message in event?.messages().map(|m| m.element()) {
191                        if let Some(securities) = message.get_named_element(&name::SECURITY_DATA) {
192                            for security in securities.values::<Element>() {
193                                let ticker: String = security
194                                    .get_named_element(&name::SECURITY_NAME)
195                                    .and_then(|s: Element| s.get_at(0))
196                                    .unwrap_or_else(String::new);
197
198                                if let Some(error) =
199                                    security.get_named_element(&name::SECURITY_ERROR)
200                                {
201                                    return Err(Error::security(ticker, error));
202                                }
203
204                                let entry: &mut R = ref_data.entry(ticker).or_default();
205
206                                if let Some(fields) = security.get_named_element(&name::FIELD_DATA)
207                                {
208                                    for field in fields.elements() {
209                                        entry.on_field(&field.string_name(), &field);
210                                    }
211                                }
212                            }
213                        }
214                    }
215                }
216            }
217        }
218
219        Ok(ref_data)
220    }
221
222    pub fn hist_data<I, R>(
223        &mut self,
224        securities: I,
225        options: HistOptions,
226    ) -> Result<HashMap<String, TimeSerie<R>>, Error>
227    where
228        I: IntoIterator,
229        I::Item: AsRef<str>,
230        R: RefData,
231    {
232        let service = self.get_service("//blp/refdata")?;
233
234        let mut ref_data: HashMap<String, TimeSerie<R>> = HashMap::new();
235        let mut iter = securities.into_iter();
236
237        for fields in R::FIELDS.chunks(MAX_HISTDATA_FIELDS) {
238            loop {
239                let mut request = service.create_request("HistoricalDataRequest")?;
240                let mut is_empty = true;
241
242                for security in iter.by_ref().take(MAX_PENDING_REQUEST / fields.len()) {
243                    request.append_named(&name::SECURITIES, security.as_ref())?;
244                    is_empty = false;
245                }
246
247                if is_empty {
248                    break;
249                }
250
251                for field in fields {
252                    request.append_named(&name::FIELDS_NAME, *field)?;
253                }
254
255                options.apply(&mut request)?;
256
257                for event in self.send(request, None)? {
258                    for message in event?.messages().map(|m| m.element()) {
259                        if let Some(security) = message.get_named_element(&name::SECURITY_DATA) {
260                            let ticker: String = security
261                                .get_named_element(&name::SECURITY_NAME)
262                                .and_then(|s: Element| s.get_at(0))
263                                .unwrap_or_else(|| String::new());
264
265                            if security.has_named_element(&name::SECURITY_ERROR) {
266                                break;
267                            }
268
269                            if let Some(fields) = security.get_named_element(&name::FIELD_DATA) {
270                                let entry: &mut TimeSerie<R> =
271                                    ref_data.entry(ticker).or_insert_with(|| {
272                                        let len: usize = fields.num_values();
273
274                                        TimeSerie::<_>::with_capacity(len)
275                                    });
276
277                                for points in fields.values::<Element>() {
278                                    let mut value = R::default();
279
280                                    for field in points.elements() {
281                                        let name = &field.string_name();
282
283                                        if name == "date" {
284                                            // entry.dates.extend(field.get_at(0));
285                                        } else {
286                                            value.on_field(name, &field);
287                                        }
288                                    }
289
290                                    entry.values.push(value);
291                                }
292                            }
293                        }
294                    }
295                }
296            }
297        }
298
299        Ok(ref_data)
300    }
301}
302
303impl std::ops::Deref for SessionSync {
304    type Target = Session;
305    fn deref(&self) -> &Session {
306        &self.0
307    }
308}
309
310impl std::ops::DerefMut for SessionSync {
311    fn deref_mut(&mut self) -> &mut Session {
312        &mut self.0
313    }
314}
315
316pub struct Events<'a> {
317    session: &'a mut SessionSync,
318    exit: bool,
319}
320
321impl<'a> Events<'a> {
322    fn new(session: &'a mut SessionSync) -> Self {
323        Events {
324            session,
325            exit: false,
326        }
327    }
328
329    fn try_next(&mut self) -> Result<Option<Event>, Error> {
330        if self.exit {
331            return Ok(None);
332        }
333        loop {
334            let event: Event = self.session.next_event(None)?;
335            let event_type: EventType = event.event_type();
336
337            match event_type {
338                EventType::PartialResponse => return Ok(Some(event)),
339                EventType::Response => {
340                    self.exit = true;
341                    return Ok(Some(event));
342                }
343                EventType::SessionStatus => {
344                    if event.messages().map(|m| m.message_type()).any(|m| {
345                        m == *name::SESSION_TERMINATED || m == *name::SESSION_STARTUP_FAILURE
346                    }) {
347                        return Ok(None);
348                    }
349                }
350
351                EventType::Timeout => return Err(Error::TimeOut),
352
353                _ => (),
354            }
355        }
356    }
357}
358
359impl<'a> Iterator for Events<'a> {
360    type Item = Result<Event, Error>;
361
362    fn next(&mut self) -> Option<Result<Event, Error>> {
363        self.try_next().transpose()
364    }
365}
366
367#[derive(Debug, Default)]
368pub struct HistOptions {
369    start_date: String,
370    end_date: String,
371    periodicity_adjustment: Option<PeriodicityAdjustment>,
372    periodicity_selection: Option<PeriodicitySelection>,
373    max_data_points: Option<i32>,
374    currency: Option<String>,
375}
376
377impl HistOptions {
378    pub fn new<S: Into<String>, E: Into<String>>(start_date: S, end_date: E) -> Self {
379        HistOptions {
380            start_date: start_date.into(),
381            end_date: end_date.into(),
382            ..HistOptions::default()
383        }
384    }
385
386    pub fn with_periodicity_adjustment(
387        mut self,
388        periodicity_adjustment: PeriodicityAdjustment,
389    ) -> Self {
390        self.periodicity_adjustment = Some(periodicity_adjustment);
391        self
392    }
393
394    pub fn with_periodicity_selection(
395        mut self,
396        periodicity_selection: PeriodicitySelection,
397    ) -> Self {
398        self.periodicity_selection = Some(periodicity_selection);
399        self
400    }
401
402    pub fn with_max_points(mut self, max_data_points: i32) -> Self {
403        self.max_data_points = Some(max_data_points);
404        self
405    }
406
407    pub fn with_currency(mut self, currency: String) -> Self {
408        self.currency = Some(currency);
409        self
410    }
411
412    fn apply(&self, request: &mut Request) -> Result<(), Error> {
413        let mut element = request.element();
414
415        element.set("startDate", &self.start_date[..])?;
416        element.set("endDate", &self.end_date[..])?;
417
418        if let Some(periodicity_selection) = self.periodicity_selection {
419            element.set("periodicitySelection", periodicity_selection.as_str())?;
420        }
421
422        if let Some(periodicity_adjustment) = self.periodicity_adjustment {
423            element.set("periodicityAdjustment", periodicity_adjustment.as_str())?;
424        }
425
426        if let Some(max_data_points) = self.max_data_points {
427            element.set("maxDataPoints", max_data_points)?;
428        }
429
430        if let Some(currency) = self.currency.as_ref() {
431            element.set("currency", &**currency)?;
432        }
433
434        Ok(())
435    }
436}
437
438#[derive(Default, Debug)]
439pub struct TimeSerie<R> {
440    pub values: Vec<R>,
441    dates: Vec<chrono::NaiveDate>,
442}
443
444impl<R> TimeSerie<R> {
445    pub fn with_capacity(capacity: usize) -> Self {
446        TimeSerie {
447            dates: Vec::with_capacity(capacity),
448            values: Vec::with_capacity(capacity),
449        }
450    }
451}
452
453#[derive(Debug, Clone, Copy)]
454pub enum PeriodicityAdjustment {
455    Actual,
456    Calendar,
457    Fiscal,
458}
459
460impl PeriodicityAdjustment {
461    pub fn as_str(self) -> &'static str {
462        match self {
463            PeriodicityAdjustment::Actual => "ACTUAL",
464            PeriodicityAdjustment::Calendar => "CALENDAR",
465            PeriodicityAdjustment::Fiscal => "FISCAL",
466        }
467    }
468}
469
470#[derive(Debug, Clone, Copy)]
471pub enum PeriodicitySelection {
472    Daily,
473    Weekly,
474    Monthly,
475    Quarterly,
476    SemiAnnually,
477    Yearly,
478}
479
480impl PeriodicitySelection {
481    pub fn as_str(self) -> &'static str {
482        match self {
483            PeriodicitySelection::Daily => "DAILY",
484            PeriodicitySelection::Weekly => "WEEKLY",
485            PeriodicitySelection::Monthly => "MONTHLY",
486            PeriodicitySelection::Quarterly => "QUARTERLY",
487            PeriodicitySelection::SemiAnnually => "SEMIANNUALLY",
488            PeriodicitySelection::Yearly => "YEARLY",
489        }
490    }
491}
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496
497    #[test]
498    fn send_request() -> Result<(), Error> {
499        let mut _session: SessionSync = SessionOptions::default()
500            .with_server_host("localhost")?
501            .with_server_port(8194)?
502            .sync();
503
504        Ok(())
505    }
506}