kunquant_rs/stream.rs
1use crate::error::{KunQuantError, Result};
2use crate::executor::Executor;
3use crate::ffi;
4use crate::library::Module;
5use std::collections::HashMap;
6use std::ffi::CString;
7
8/// A streaming computation context for real-time factor calculation.
9///
10/// `StreamContext` provides an interface for real-time factor computation using KunQuant's
11/// streaming engine. It maintains internal state for efficient buffer management and supports
12/// low-latency processing of market data streams.
13///
14/// # Lifetime Parameters
15///
16/// * `'a` - The lifetime of the executor and module references
17///
18/// # Thread Safety
19///
20/// This struct is not thread-safe. Each thread should create its own `StreamContext` instance.
21///
22/// # Memory Management
23///
24/// The streaming context automatically manages its resources using RAII. The underlying
25/// C handle is properly cleaned up when the context is dropped.
26pub struct StreamContext<'a> {
27 handle: ffi::KunStreamContextHandle,
28 num_stocks: usize,
29 _executor: &'a Executor,
30 _module: &'a Module<'a>,
31 // Cache buffer handles to avoid repeated lookups
32 buffer_handles: HashMap<String, usize>,
33}
34
35impl<'a> StreamContext<'a> {
36 /// Creates a new streaming context for real-time factor calculation.
37 ///
38 /// This function initializes a streaming context that can process market data
39 /// in real-time using the specified executor and factor module.
40 ///
41 /// # Arguments
42 ///
43 /// * `executor` - Reference to the KunQuant executor that will run the computations
44 /// * `module` - Reference to the compiled factor module containing the computation graph
45 /// * `num_stocks` - Number of stocks to process (must be a multiple of 8 for SIMD optimization)
46 ///
47 /// # Returns
48 ///
49 /// Returns `Ok(StreamContext)` on success, or an error if:
50 /// - The executor handle is invalid
51 /// - The module handle is invalid
52 /// - The streaming context creation fails in the C library
53 /// - `num_stocks` is not a multiple of 8
54 ///
55 /// # Examples
56 ///
57 /// ```rust,no_run
58 /// use kunquant_rs::{Executor, Library, StreamContext};
59 ///
60 /// # fn main() -> kunquant_rs::Result<()> {
61 /// let executor = Executor::single_thread()?;
62 /// let library = Library::load("factor_lib.so")?;
63 /// let module = library.get_module("my_factor")?;
64 ///
65 /// // Create streaming context for 16 stocks
66 /// let stream = StreamContext::new(&executor, &module, 16)?;
67 /// # Ok(())
68 /// # }
69 /// ```
70 ///
71 /// # Performance Notes
72 ///
73 /// - Buffer handles are cached internally for optimal performance
74 /// - The context reuses memory buffers across multiple time steps
75 /// - SIMD optimizations require `num_stocks` to be a multiple of 8
76 pub fn new(executor: &'a Executor, module: &'a Module<'a>, num_stocks: usize) -> Result<Self> {
77 if num_stocks % 8 != 0 {
78 return Err(KunQuantError::InvalidStockCount { num_stocks });
79 }
80
81 let handle =
82 unsafe { ffi::kunCreateStream(executor.handle(), module.handle(), num_stocks) };
83
84 if handle.is_null() {
85 return Err(KunQuantError::StreamCreationFailed);
86 }
87
88 Ok(StreamContext {
89 handle,
90 num_stocks,
91 _executor: executor,
92 _module: module,
93 buffer_handles: HashMap::new(),
94 })
95 }
96
97 /// Retrieves the buffer handle for a named input or output buffer.
98 ///
99 /// Buffer handles are used internally by KunQuant to efficiently identify and access
100 /// data buffers. This method caches handles to avoid repeated lookups, improving
101 /// performance in streaming scenarios.
102 ///
103 /// # Arguments
104 ///
105 /// * `name` - The name of the buffer as defined in the factor module. Can be any type
106 /// that implements `AsRef<str>` (e.g., `&str`, `String`, etc.)
107 ///
108 /// # Returns
109 ///
110 /// Returns `Ok(handle)` where `handle` is a numeric identifier for the buffer,
111 /// or an error if:
112 /// - The buffer name is not found in the module
113 /// - The streaming context handle is invalid
114 /// - The C library call fails
115 ///
116 /// # Examples
117 ///
118 /// ```rust,no_run
119 /// # use kunquant_rs::{Executor, Library, StreamContext, Result};
120 /// # fn example(mut stream: StreamContext) -> Result<()> {
121 /// // Get handle for input buffer
122 /// let input_handle = stream.get_buffer_handle("close")?;
123 ///
124 /// // Get handle for output buffer
125 /// let output_handle = stream.get_buffer_handle("factor_output")?;
126 ///
127 /// // Works with String as well
128 /// let buffer_name = String::from("volume");
129 /// let volume_handle = stream.get_buffer_handle(buffer_name)?;
130 /// # Ok(())
131 /// # }
132 /// ```
133 ///
134 /// # Performance Notes
135 ///
136 /// - Handles are cached internally after first lookup
137 /// - Subsequent calls for the same buffer name return cached values
138 /// - This optimization is crucial for high-frequency streaming applications
139 /// - The cache persists for the lifetime of the `StreamContext`
140 pub fn get_buffer_handle<N: AsRef<str>>(&mut self, name: N) -> Result<usize> {
141 let name_str = name.as_ref();
142
143 if let Some(&handle) = self.buffer_handles.get(name_str) {
144 return Ok(handle);
145 }
146
147 let c_name = CString::new(name_str)?;
148 let handle = unsafe { ffi::kunQueryBufferHandle(self.handle, c_name.as_ptr()) };
149
150 // Note: KunQuant returns SIZE_MAX for invalid buffer names
151 if handle == usize::MAX {
152 return Err(KunQuantError::BufferHandleNotFound {
153 name: name_str.to_string(),
154 });
155 }
156
157 self.buffer_handles.insert(name_str.to_string(), handle);
158 Ok(handle)
159 }
160
161 /// Retrieves the current computed data from a named output buffer.
162 ///
163 /// After calling `run()`, this method provides access to the computed factor values
164 /// for the current time step. The returned slice contains values for all stocks.
165 ///
166 /// # Arguments
167 ///
168 /// * `name` - The name of the output buffer as defined in the factor module
169 ///
170 /// # Returns
171 ///
172 /// Returns `Ok(&[f32])` containing the computed values for all stocks, or an error if:
173 /// - The buffer name is not found
174 /// - The computation hasn't been run yet (call `run()` first)
175 /// - The streaming context handle is invalid
176 /// - The C library returns a null pointer
177 ///
178 /// # Examples
179 ///
180 /// ```rust,no_run
181 /// # use kunquant_rs::{Executor, Library, StreamContext, Result};
182 /// # fn example(mut stream: StreamContext) -> Result<()> {
183 /// // Push input data
184 /// let prices = vec![100.0, 200.0, 150.0, 75.0, 300.0, 125.0, 90.0, 180.0];
185 /// stream.push_data("close", &prices)?;
186 ///
187 /// // Run computation
188 /// stream.run()?;
189 ///
190 /// // Get computed factor values
191 /// let factor_values = stream.get_current_buffer("my_factor")?;
192 /// println!("Factor values: {:?}", factor_values);
193 /// # Ok(())
194 /// # }
195 /// ```
196 ///
197 /// # Data Validity
198 ///
199 /// - The returned slice is valid until the next call to `push_data()` or `run()`
200 /// - Values may include NaN for stocks where computation is undefined
201 /// - The slice length always equals `num_stocks`
202 ///
203 /// # Safety Notes
204 ///
205 /// - The returned slice borrows from internal C buffers
206 /// - The lifetime is tied to the `StreamContext` instance
207 /// - Do not store references beyond the next streaming operation
208 pub fn get_current_buffer<N: AsRef<str>>(&mut self, name: N) -> Result<&[f32]> {
209 let handle = self.get_buffer_handle(name)?;
210 let ptr = unsafe { ffi::kunStreamGetCurrentBuffer(self.handle, handle) };
211
212 if ptr.is_null() {
213 return Err(KunQuantError::NullPointer);
214 }
215
216 Ok(unsafe { std::slice::from_raw_parts(ptr, self.num_stocks) })
217 }
218
219 /// Pushes new market data to a named input buffer for the current time step.
220 ///
221 /// This method feeds new data into the streaming computation pipeline. The data
222 /// represents values for all stocks at a single point in time (e.g., current prices,
223 /// volumes, etc.).
224 ///
225 /// # Arguments
226 ///
227 /// * `name` - The name of the input buffer as defined in the factor module
228 /// * `data` - Slice containing data for all stocks. Length must equal `num_stocks`
229 ///
230 /// # Returns
231 ///
232 /// Returns `Ok(())` on success, or an error if:
233 /// - The data length doesn't match the number of stocks
234 /// - The buffer name is not found
235 /// - The streaming context handle is invalid
236 /// - The C library call fails
237 ///
238 /// # Examples
239 ///
240 /// ```rust,no_run
241 /// # use kunquant_rs::{Executor, Library, StreamContext, Result};
242 /// # fn example(mut stream: StreamContext) -> Result<()> {
243 /// // Push closing prices for 8 stocks
244 /// let close_prices = vec![100.5, 200.3, 150.7, 75.2, 300.1, 125.8, 90.4, 180.6];
245 /// stream.push_data("close", &close_prices)?;
246 ///
247 /// // Push volume data
248 /// let volumes = vec![1000.0, 2000.0, 1500.0, 800.0, 3000.0, 1200.0, 900.0, 1800.0];
249 /// stream.push_data("volume", &volumes)?;
250 /// # Ok(())
251 /// # }
252 /// ```
253 ///
254 /// # Data Requirements
255 ///
256 /// - Data must be provided for exactly `num_stocks` securities
257 /// - Values should be finite floating-point numbers
258 /// - NaN and infinite values may cause computation errors
259 /// - Data represents a single time point across all stocks
260 ///
261 /// # Performance Notes
262 ///
263 /// - Data is copied into internal buffers managed by KunQuant
264 /// - Buffer handles are cached for optimal performance
265 /// - This method is designed for high-frequency updates
266 pub fn push_data<N: AsRef<str>>(&mut self, name: N, data: &[f32]) -> Result<()> {
267 if data.len() != self.num_stocks {
268 return Err(KunQuantError::BufferSizeMismatch {
269 name: name.as_ref().to_string(),
270 expected: self.num_stocks,
271 actual: data.len(),
272 });
273 }
274
275 let handle = self.get_buffer_handle(name)?;
276 unsafe {
277 ffi::kunStreamPushData(self.handle, handle, data.as_ptr());
278 }
279 Ok(())
280 }
281
282 /// Executes the factor computation on the currently pushed data.
283 ///
284 /// This method triggers the execution of the factor computation graph using all
285 /// input data that has been pushed since the last `run()` call. After successful
286 /// execution, computed results can be retrieved using `get_current_buffer()`.
287 ///
288 /// # Returns
289 ///
290 /// Returns `Ok(())` on successful computation, or an error if:
291 /// - The streaming context handle is invalid
292 /// - Required input data hasn't been pushed
293 /// - The computation encounters runtime errors
294 /// - The C library execution fails
295 ///
296 /// # Examples
297 ///
298 /// ```rust,no_run
299 /// # use kunquant_rs::{Executor, Library, StreamContext, Result};
300 /// # fn example(mut stream: StreamContext) -> Result<()> {
301 /// // Push all required input data
302 /// let close = vec![100.0, 200.0, 150.0, 75.0, 300.0, 125.0, 90.0, 180.0];
303 /// let open = vec![99.0, 199.0, 149.0, 74.0, 299.0, 124.0, 89.0, 179.0];
304 ///
305 /// stream.push_data("close", &close)?;
306 /// stream.push_data("open", &open)?;
307 ///
308 /// // Execute computation
309 /// stream.run()?;
310 ///
311 /// // Now results are available
312 /// let results = stream.get_current_buffer("output")?;
313 /// # Ok(())
314 /// # }
315 /// ```
316 ///
317 /// # Execution Model
318 ///
319 /// - Computation is performed synchronously
320 /// - All required inputs must be pushed before calling `run()`
321 /// - Results are immediately available after successful execution
322 /// - The method can be called repeatedly for streaming scenarios
323 ///
324 /// # Performance Notes
325 ///
326 /// - Optimized for low-latency execution
327 /// - Uses SIMD instructions when possible
328 /// - Memory buffers are reused between calls
329 /// - Execution time depends on factor complexity and number of stocks
330 pub fn run(&self) -> Result<()> {
331 if self.handle.is_null() {
332 return Err(KunQuantError::NullPointer);
333 }
334
335 unsafe {
336 ffi::kunStreamRun(self.handle);
337 }
338 Ok(())
339 }
340
341 /// Returns the number of stocks this streaming context is configured to process.
342 ///
343 /// This value is set during context creation and determines the expected length
344 /// of all input and output data arrays. It cannot be changed after creation.
345 ///
346 /// # Returns
347 ///
348 /// The number of stocks as specified when creating the streaming context.
349 ///
350 /// # Examples
351 ///
352 /// ```rust,no_run
353 /// # use kunquant_rs::{Executor, Library, StreamContext, Result};
354 /// # fn example(stream: StreamContext) -> Result<()> {
355 /// let num_stocks = stream.num_stocks();
356 /// println!("Processing {} stocks", num_stocks);
357 ///
358 /// // Ensure input data has correct length
359 /// let prices = vec![100.0; num_stocks];
360 /// // stream.push_data("close", &prices)?;
361 /// # Ok(())
362 /// # }
363 /// ```
364 ///
365 /// # Usage Notes
366 ///
367 /// - All input data arrays must have exactly this length
368 /// - All output data arrays will have exactly this length
369 /// - For optimal SIMD performance, this should be a multiple of 8
370 /// - The value is immutable for the lifetime of the context
371 pub fn num_stocks(&self) -> usize {
372 self.num_stocks
373 }
374}
375
376impl<'a> Drop for StreamContext<'a> {
377 fn drop(&mut self) {
378 if !self.handle.is_null() {
379 unsafe {
380 ffi::kunDestoryStream(self.handle);
381 }
382 }
383 }
384}