1use std::{collections::HashMap, str::FromStr, sync::Arc};
17
18use arrow::{
19 array::{Array, Float64Array, Float64Builder, StringBuilder, UInt64Array},
20 datatypes::{DataType, Field, Schema},
21 error::ArrowError,
22 record_batch::RecordBatch,
23};
24use nautilus_model::{
25 data::{Data, greeks::OptionGreekValues, option_chain::OptionGreeks},
26 enums::GreeksConvention,
27 identifiers::InstrumentId,
28};
29
30use super::{
31 ArrowSchemaProvider, DecodeDataFromRecordBatch, DecodeFromRecordBatch, EncodeToRecordBatch,
32 EncodingError, KEY_INSTRUMENT_ID, extract_column, extract_column_string,
33};
34
35const TYPE_NAME: &str = "OptionGreeks";
36
37impl ArrowSchemaProvider for OptionGreeks {
38 fn get_schema(metadata: Option<HashMap<String, String>>) -> Schema {
39 let fields = vec![
40 Field::new("instrument_id", DataType::Utf8, false),
41 Field::new("delta", DataType::Float64, false),
42 Field::new("gamma", DataType::Float64, false),
43 Field::new("vega", DataType::Float64, false),
44 Field::new("theta", DataType::Float64, false),
45 Field::new("rho", DataType::Float64, false),
46 Field::new("mark_iv", DataType::Float64, true),
47 Field::new("bid_iv", DataType::Float64, true),
48 Field::new("ask_iv", DataType::Float64, true),
49 Field::new("underlying_price", DataType::Float64, true),
50 Field::new("open_interest", DataType::Float64, true),
51 Field::new("ts_event", DataType::UInt64, false),
52 Field::new("ts_init", DataType::UInt64, false),
53 Field::new("convention", DataType::Utf8, false),
54 ];
55
56 let mut metadata = metadata.unwrap_or_default();
57 metadata.insert("type".to_string(), TYPE_NAME.to_string());
58 Schema::new_with_metadata(fields, metadata)
59 }
60}
61
62fn append_optional_f64(builder: &mut Float64Builder, value: Option<f64>) {
63 match value {
64 Some(value) => builder.append_value(value),
65 None => builder.append_null(),
66 }
67}
68
69fn optional_f64(values: &Float64Array, row: usize) -> Option<f64> {
70 if values.is_null(row) {
71 None
72 } else {
73 Some(values.value(row))
74 }
75}
76
77impl EncodeToRecordBatch for OptionGreeks {
78 fn encode_batch(
79 metadata: &HashMap<String, String>,
80 data: &[Self],
81 ) -> Result<RecordBatch, ArrowError> {
82 let mut instrument_id_builder = StringBuilder::new();
83 let mut delta_builder = Float64Builder::new();
84 let mut gamma_builder = Float64Builder::new();
85 let mut vega_builder = Float64Builder::new();
86 let mut theta_builder = Float64Builder::new();
87 let mut rho_builder = Float64Builder::new();
88 let mut mark_iv_builder = Float64Builder::new();
89 let mut bid_iv_builder = Float64Builder::new();
90 let mut ask_iv_builder = Float64Builder::new();
91 let mut underlying_price_builder = Float64Builder::new();
92 let mut open_interest_builder = Float64Builder::new();
93 let mut ts_event_builder = UInt64Array::builder(data.len());
94 let mut ts_init_builder = UInt64Array::builder(data.len());
95 let mut convention_builder = StringBuilder::new();
96
97 for greeks in data {
98 instrument_id_builder.append_value(greeks.instrument_id.to_string());
99 delta_builder.append_value(greeks.delta);
100 gamma_builder.append_value(greeks.gamma);
101 vega_builder.append_value(greeks.vega);
102 theta_builder.append_value(greeks.theta);
103 rho_builder.append_value(greeks.rho);
104 append_optional_f64(&mut mark_iv_builder, greeks.mark_iv);
105 append_optional_f64(&mut bid_iv_builder, greeks.bid_iv);
106 append_optional_f64(&mut ask_iv_builder, greeks.ask_iv);
107 append_optional_f64(&mut underlying_price_builder, greeks.underlying_price);
108 append_optional_f64(&mut open_interest_builder, greeks.open_interest);
109 ts_event_builder.append_value(greeks.ts_event.as_u64());
110 ts_init_builder.append_value(greeks.ts_init.as_u64());
111 convention_builder.append_value(greeks.convention);
112 }
113
114 RecordBatch::try_new(
115 Arc::new(Self::get_schema(Some(metadata.clone()))),
116 vec![
117 Arc::new(instrument_id_builder.finish()),
118 Arc::new(delta_builder.finish()),
119 Arc::new(gamma_builder.finish()),
120 Arc::new(vega_builder.finish()),
121 Arc::new(theta_builder.finish()),
122 Arc::new(rho_builder.finish()),
123 Arc::new(mark_iv_builder.finish()),
124 Arc::new(bid_iv_builder.finish()),
125 Arc::new(ask_iv_builder.finish()),
126 Arc::new(underlying_price_builder.finish()),
127 Arc::new(open_interest_builder.finish()),
128 Arc::new(ts_event_builder.finish()),
129 Arc::new(ts_init_builder.finish()),
130 Arc::new(convention_builder.finish()),
131 ],
132 )
133 }
134
135 fn metadata(&self) -> HashMap<String, String> {
136 HashMap::from([
137 ("type".to_string(), TYPE_NAME.to_string()),
138 (
139 KEY_INSTRUMENT_ID.to_string(),
140 self.instrument_id.to_string(),
141 ),
142 ])
143 }
144}
145
146impl DecodeFromRecordBatch for OptionGreeks {
147 fn decode_batch(
148 _metadata: &HashMap<String, String>,
149 record_batch: RecordBatch,
150 ) -> Result<Vec<Self>, EncodingError> {
151 let cols = record_batch.columns();
152
153 let instrument_id_values = extract_column_string(cols, "instrument_id", 0)?;
154 let delta_values = extract_column::<Float64Array>(cols, "delta", 1, DataType::Float64)?;
155 let gamma_values = extract_column::<Float64Array>(cols, "gamma", 2, DataType::Float64)?;
156 let vega_values = extract_column::<Float64Array>(cols, "vega", 3, DataType::Float64)?;
157 let theta_values = extract_column::<Float64Array>(cols, "theta", 4, DataType::Float64)?;
158 let rho_values = extract_column::<Float64Array>(cols, "rho", 5, DataType::Float64)?;
159 let mark_iv_values = extract_column::<Float64Array>(cols, "mark_iv", 6, DataType::Float64)?;
160 let bid_iv_values = extract_column::<Float64Array>(cols, "bid_iv", 7, DataType::Float64)?;
161 let ask_iv_values = extract_column::<Float64Array>(cols, "ask_iv", 8, DataType::Float64)?;
162 let underlying_price_values =
163 extract_column::<Float64Array>(cols, "underlying_price", 9, DataType::Float64)?;
164 let open_interest_values =
165 extract_column::<Float64Array>(cols, "open_interest", 10, DataType::Float64)?;
166 let ts_event_values =
167 extract_column::<UInt64Array>(cols, "ts_event", 11, DataType::UInt64)?;
168 let ts_init_values = extract_column::<UInt64Array>(cols, "ts_init", 12, DataType::UInt64)?;
169 let convention_values = extract_column_string(cols, "convention", 13)?;
170
171 let result: Result<Vec<Self>, EncodingError> = (0..record_batch.num_rows())
172 .map(|row| {
173 let instrument_id = InstrumentId::from_str(instrument_id_values.value(row))
174 .map_err(|e| EncodingError::ParseError("instrument_id", e.to_string()))?;
175 let convention = GreeksConvention::from_str(convention_values.value(row))
176 .map_err(|e| EncodingError::ParseError("convention", e.to_string()))?;
177
178 Ok(Self {
179 instrument_id,
180 convention,
181 greeks: OptionGreekValues {
182 delta: delta_values.value(row),
183 gamma: gamma_values.value(row),
184 vega: vega_values.value(row),
185 theta: theta_values.value(row),
186 rho: rho_values.value(row),
187 },
188 mark_iv: optional_f64(mark_iv_values, row),
189 bid_iv: optional_f64(bid_iv_values, row),
190 ask_iv: optional_f64(ask_iv_values, row),
191 underlying_price: optional_f64(underlying_price_values, row),
192 open_interest: optional_f64(open_interest_values, row),
193 ts_event: ts_event_values.value(row).into(),
194 ts_init: ts_init_values.value(row).into(),
195 })
196 })
197 .collect();
198
199 result
200 }
201}
202
203impl DecodeDataFromRecordBatch for OptionGreeks {
204 fn decode_data_batch(
205 metadata: &HashMap<String, String>,
206 record_batch: RecordBatch,
207 ) -> Result<Vec<Data>, EncodingError> {
208 let greeks = Self::decode_batch(metadata, record_batch)?;
209 Ok(greeks.into_iter().map(Data::from).collect())
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 use nautilus_model::{enums::GreeksConvention, identifiers::InstrumentId};
216 use rstest::rstest;
217
218 use super::*;
219
220 #[rstest]
221 fn test_encode_decode_round_trip() {
222 let instrument_id = InstrumentId::from("BTC-20260529-100000-C.OKX");
223 let original = vec![
224 OptionGreeks {
225 instrument_id,
226 convention: GreeksConvention::BlackScholes,
227 greeks: OptionGreekValues {
228 delta: 0.55,
229 gamma: 0.012,
230 vega: 3.4,
231 theta: -1.2,
232 rho: 0.01,
233 },
234 mark_iv: Some(0.64),
235 bid_iv: Some(0.62),
236 ask_iv: Some(0.66),
237 underlying_price: Some(100_000.0),
238 open_interest: Some(42.0),
239 ts_event: 1.into(),
240 ts_init: 2.into(),
241 },
242 OptionGreeks {
243 instrument_id,
244 convention: GreeksConvention::PriceAdjusted,
245 greeks: OptionGreekValues {
246 delta: 0.42,
247 gamma: 0.009,
248 vega: 2.9,
249 theta: -0.9,
250 rho: 0.02,
251 },
252 mark_iv: None,
253 bid_iv: None,
254 ask_iv: None,
255 underlying_price: None,
256 open_interest: None,
257 ts_event: 3.into(),
258 ts_init: 4.into(),
259 },
260 ];
261
262 let metadata = original[0].metadata();
263 let record_batch = OptionGreeks::encode_batch(&metadata, &original).unwrap();
264 let decoded = OptionGreeks::decode_batch(&metadata, record_batch).unwrap();
265
266 assert_eq!(decoded, original);
267 }
268}