Skip to main content

nautilus_serialization/arrow/
option_greeks.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{collections::HashMap, str::FromStr, sync::Arc};
17
18use arrow::{
19    array::{Array, Float64Array, Float64Builder, StringBuilder, UInt64Array},
20    datatypes::{DataType, Field, Schema},
21    error::ArrowError,
22    record_batch::RecordBatch,
23};
24use nautilus_model::{
25    data::{Data, greeks::OptionGreekValues, option_chain::OptionGreeks},
26    enums::GreeksConvention,
27    identifiers::InstrumentId,
28};
29
30use super::{
31    ArrowSchemaProvider, DecodeDataFromRecordBatch, DecodeFromRecordBatch, EncodeToRecordBatch,
32    EncodingError, KEY_INSTRUMENT_ID, extract_column, extract_column_string,
33};
34
35const TYPE_NAME: &str = "OptionGreeks";
36
37impl ArrowSchemaProvider for OptionGreeks {
38    fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema {
39        let fields = vec![
40            Field::new("instrument_id", DataType::Utf8, false),
41            Field::new("delta", DataType::Float64, false),
42            Field::new("gamma", DataType::Float64, false),
43            Field::new("vega", DataType::Float64, false),
44            Field::new("theta", DataType::Float64, false),
45            Field::new("rho", DataType::Float64, false),
46            Field::new("mark_iv", DataType::Float64, true),
47            Field::new("bid_iv", DataType::Float64, true),
48            Field::new("ask_iv", DataType::Float64, true),
49            Field::new("underlying_price", DataType::Float64, true),
50            Field::new("open_interest", DataType::Float64, true),
51            Field::new("ts_event", DataType::UInt64, false),
52            Field::new("ts_init", DataType::UInt64, false),
53            Field::new("convention", DataType::Utf8, false),
54        ];
55
56        let mut metadata = metadata.unwrap_or_default();
57        metadata.insert("type".to_string(), TYPE_NAME.to_string());
58        Schema::new_with_metadata(fields, metadata)
59    }
60}
61
62fn append_optional_f64(builder: &mut Float64Builder, value: Option<f64>) {
63    match value {
64        Some(value) => builder.append_value(value),
65        None => builder.append_null(),
66    }
67}
68
69fn optional_f64(values: &Float64Array, row: usize) -> Option<f64> {
70    if values.is_null(row) {
71        None
72    } else {
73        Some(values.value(row))
74    }
75}
76
77impl EncodeToRecordBatch for OptionGreeks {
78    fn encode_batch(
79        metadata: &HashMap<String, String>,
80        data: &[Self],
81    ) -> Result<RecordBatch, ArrowError> {
82        let mut instrument_id_builder = StringBuilder::new();
83        let mut delta_builder = Float64Builder::new();
84        let mut gamma_builder = Float64Builder::new();
85        let mut vega_builder = Float64Builder::new();
86        let mut theta_builder = Float64Builder::new();
87        let mut rho_builder = Float64Builder::new();
88        let mut mark_iv_builder = Float64Builder::new();
89        let mut bid_iv_builder = Float64Builder::new();
90        let mut ask_iv_builder = Float64Builder::new();
91        let mut underlying_price_builder = Float64Builder::new();
92        let mut open_interest_builder = Float64Builder::new();
93        let mut ts_event_builder = UInt64Array::builder(data.len());
94        let mut ts_init_builder = UInt64Array::builder(data.len());
95        let mut convention_builder = StringBuilder::new();
96
97        for greeks in data {
98            instrument_id_builder.append_value(greeks.instrument_id.to_string());
99            delta_builder.append_value(greeks.delta);
100            gamma_builder.append_value(greeks.gamma);
101            vega_builder.append_value(greeks.vega);
102            theta_builder.append_value(greeks.theta);
103            rho_builder.append_value(greeks.rho);
104            append_optional_f64(&mut mark_iv_builder, greeks.mark_iv);
105            append_optional_f64(&mut bid_iv_builder, greeks.bid_iv);
106            append_optional_f64(&mut ask_iv_builder, greeks.ask_iv);
107            append_optional_f64(&mut underlying_price_builder, greeks.underlying_price);
108            append_optional_f64(&mut open_interest_builder, greeks.open_interest);
109            ts_event_builder.append_value(greeks.ts_event.as_u64());
110            ts_init_builder.append_value(greeks.ts_init.as_u64());
111            convention_builder.append_value(greeks.convention);
112        }
113
114        RecordBatch::try_new(
115            Arc::new(Self::get_schema(Some(metadata.clone()))),
116            vec![
117                Arc::new(instrument_id_builder.finish()),
118                Arc::new(delta_builder.finish()),
119                Arc::new(gamma_builder.finish()),
120                Arc::new(vega_builder.finish()),
121                Arc::new(theta_builder.finish()),
122                Arc::new(rho_builder.finish()),
123                Arc::new(mark_iv_builder.finish()),
124                Arc::new(bid_iv_builder.finish()),
125                Arc::new(ask_iv_builder.finish()),
126                Arc::new(underlying_price_builder.finish()),
127                Arc::new(open_interest_builder.finish()),
128                Arc::new(ts_event_builder.finish()),
129                Arc::new(ts_init_builder.finish()),
130                Arc::new(convention_builder.finish()),
131            ],
132        )
133    }
134
135    fn metadata(&self) -> HashMap<String, String> {
136        HashMap::from([
137            ("type".to_string(), TYPE_NAME.to_string()),
138            (
139                KEY_INSTRUMENT_ID.to_string(),
140                self.instrument_id.to_string(),
141            ),
142        ])
143    }
144}
145
146impl DecodeFromRecordBatch for OptionGreeks {
147    fn decode_batch(
148        _metadata: &HashMap<String, String>,
149        record_batch: RecordBatch,
150    ) -> Result<Vec<Self>, EncodingError> {
151        let cols = record_batch.columns();
152
153        let instrument_id_values = extract_column_string(cols, "instrument_id", 0)?;
154        let delta_values = extract_column::<Float64Array>(cols, "delta", 1, DataType::Float64)?;
155        let gamma_values = extract_column::<Float64Array>(cols, "gamma", 2, DataType::Float64)?;
156        let vega_values = extract_column::<Float64Array>(cols, "vega", 3, DataType::Float64)?;
157        let theta_values = extract_column::<Float64Array>(cols, "theta", 4, DataType::Float64)?;
158        let rho_values = extract_column::<Float64Array>(cols, "rho", 5, DataType::Float64)?;
159        let mark_iv_values = extract_column::<Float64Array>(cols, "mark_iv", 6, DataType::Float64)?;
160        let bid_iv_values = extract_column::<Float64Array>(cols, "bid_iv", 7, DataType::Float64)?;
161        let ask_iv_values = extract_column::<Float64Array>(cols, "ask_iv", 8, DataType::Float64)?;
162        let underlying_price_values =
163            extract_column::<Float64Array>(cols, "underlying_price", 9, DataType::Float64)?;
164        let open_interest_values =
165            extract_column::<Float64Array>(cols, "open_interest", 10, DataType::Float64)?;
166        let ts_event_values =
167            extract_column::<UInt64Array>(cols, "ts_event", 11, DataType::UInt64)?;
168        let ts_init_values = extract_column::<UInt64Array>(cols, "ts_init", 12, DataType::UInt64)?;
169        let convention_values = extract_column_string(cols, "convention", 13)?;
170
171        let result: Result<Vec<Self>, EncodingError> = (0..record_batch.num_rows())
172            .map(|row| {
173                let instrument_id = InstrumentId::from_str(instrument_id_values.value(row))
174                    .map_err(|e| EncodingError::ParseError("instrument_id", e.to_string()))?;
175                let convention = GreeksConvention::from_str(convention_values.value(row))
176                    .map_err(|e| EncodingError::ParseError("convention", e.to_string()))?;
177
178                Ok(Self {
179                    instrument_id,
180                    convention,
181                    greeks: OptionGreekValues {
182                        delta: delta_values.value(row),
183                        gamma: gamma_values.value(row),
184                        vega: vega_values.value(row),
185                        theta: theta_values.value(row),
186                        rho: rho_values.value(row),
187                    },
188                    mark_iv: optional_f64(mark_iv_values, row),
189                    bid_iv: optional_f64(bid_iv_values, row),
190                    ask_iv: optional_f64(ask_iv_values, row),
191                    underlying_price: optional_f64(underlying_price_values, row),
192                    open_interest: optional_f64(open_interest_values, row),
193                    ts_event: ts_event_values.value(row).into(),
194                    ts_init: ts_init_values.value(row).into(),
195                })
196            })
197            .collect();
198
199        result
200    }
201}
202
203impl DecodeDataFromRecordBatch for OptionGreeks {
204    fn decode_data_batch(
205        metadata: &HashMap<String, String>,
206        record_batch: RecordBatch,
207    ) -> Result<Vec<Data>, EncodingError> {
208        let greeks = Self::decode_batch(metadata, record_batch)?;
209        Ok(greeks.into_iter().map(Data::from).collect())
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    use nautilus_model::{enums::GreeksConvention, identifiers::InstrumentId};
216    use rstest::rstest;
217
218    use super::*;
219
220    #[rstest]
221    fn test_encode_decode_round_trip() {
222        let instrument_id = InstrumentId::from("BTC-20260529-100000-C.OKX");
223        let original = vec![
224            OptionGreeks {
225                instrument_id,
226                convention: GreeksConvention::BlackScholes,
227                greeks: OptionGreekValues {
228                    delta: 0.55,
229                    gamma: 0.012,
230                    vega: 3.4,
231                    theta: -1.2,
232                    rho: 0.01,
233                },
234                mark_iv: Some(0.64),
235                bid_iv: Some(0.62),
236                ask_iv: Some(0.66),
237                underlying_price: Some(100_000.0),
238                open_interest: Some(42.0),
239                ts_event: 1.into(),
240                ts_init: 2.into(),
241            },
242            OptionGreeks {
243                instrument_id,
244                convention: GreeksConvention::PriceAdjusted,
245                greeks: OptionGreekValues {
246                    delta: 0.42,
247                    gamma: 0.009,
248                    vega: 2.9,
249                    theta: -0.9,
250                    rho: 0.02,
251                },
252                mark_iv: None,
253                bid_iv: None,
254                ask_iv: None,
255                underlying_price: None,
256                open_interest: None,
257                ts_event: 3.into(),
258                ts_init: 4.into(),
259            },
260        ];
261
262        let metadata = original[0].metadata();
263        let record_batch = OptionGreeks::encode_batch(&metadata, &original).unwrap();
264        let decoded = OptionGreeks::decode_batch(&metadata, record_batch).unwrap();
265
266        assert_eq!(decoded, original);
267    }
268}