Skip to main content

kunquant_rs/
stream.rs

1use crate::error::{KunQuantError, Result};
2use crate::executor::Executor;
3use crate::ffi;
4use crate::library::Module;
5use std::collections::HashMap;
6use std::ffi::CString;
7
8/// A streaming computation context for real-time factor calculation.
9///
10/// `StreamContext` provides an interface for real-time factor computation using KunQuant's
11/// streaming engine. It maintains internal state for efficient buffer management and supports
12/// low-latency processing of market data streams.
13///
14/// # Lifetime Parameters
15///
16/// * `'a` - The lifetime of the executor and module references
17///
18/// # Thread Safety
19///
20/// This struct is not thread-safe. Each thread should create its own `StreamContext` instance.
21///
22/// # Memory Management
23///
24/// The streaming context automatically manages its resources using RAII. The underlying
25/// C handle is properly cleaned up when the context is dropped.
26pub struct StreamContext<'a> {
27    handle: ffi::KunStreamContextHandle,
28    num_stocks: usize,
29    _executor: &'a Executor,
30    _module: &'a Module<'a>,
31    // Cache buffer handles to avoid repeated lookups
32    buffer_handles: HashMap<String, usize>,
33}
34
35impl<'a> StreamContext<'a> {
36    /// Creates a new streaming context for real-time factor calculation.
37    ///
38    /// This function initializes a streaming context that can process market data
39    /// in real-time using the specified executor and factor module.
40    ///
41    /// # Arguments
42    ///
43    /// * `executor` - Reference to the KunQuant executor that will run the computations
44    /// * `module` - Reference to the compiled factor module containing the computation graph
45    /// * `num_stocks` - Number of stocks to process (must be a multiple of 8 for SIMD optimization)
46    ///
47    /// # Returns
48    ///
49    /// Returns `Ok(StreamContext)` on success, or an error if:
50    /// - The executor handle is invalid
51    /// - The module handle is invalid
52    /// - The streaming context creation fails in the C library
53    /// - `num_stocks` is not a multiple of 8
54    ///
55    /// # Examples
56    ///
57    /// ```rust,no_run
58    /// use kunquant_rs::{Executor, Library, StreamContext};
59    ///
60    /// # fn main() -> kunquant_rs::Result<()> {
61    /// let executor = Executor::single_thread()?;
62    /// let library = Library::load("factor_lib.so")?;
63    /// let module = library.get_module("my_factor")?;
64    ///
65    /// // Create streaming context for 16 stocks
66    /// let stream = StreamContext::new(&executor, &module, 16)?;
67    /// # Ok(())
68    /// # }
69    /// ```
70    ///
71    /// # Performance Notes
72    ///
73    /// - Buffer handles are cached internally for optimal performance
74    /// - The context reuses memory buffers across multiple time steps
75    /// - SIMD optimizations require `num_stocks` to be a multiple of 8
76    pub fn new(executor: &'a Executor, module: &'a Module<'a>, num_stocks: usize) -> Result<Self> {
77        if num_stocks % 8 != 0 {
78            return Err(KunQuantError::InvalidStockCount { num_stocks });
79        }
80
81        let handle =
82            unsafe { ffi::kunCreateStream(executor.handle(), module.handle(), num_stocks) };
83
84        if handle.is_null() {
85            return Err(KunQuantError::StreamCreationFailed);
86        }
87
88        Ok(StreamContext {
89            handle,
90            num_stocks,
91            _executor: executor,
92            _module: module,
93            buffer_handles: HashMap::new(),
94        })
95    }
96
97    /// Retrieves the buffer handle for a named input or output buffer.
98    ///
99    /// Buffer handles are used internally by KunQuant to efficiently identify and access
100    /// data buffers. This method caches handles to avoid repeated lookups, improving
101    /// performance in streaming scenarios.
102    ///
103    /// # Arguments
104    ///
105    /// * `name` - The name of the buffer as defined in the factor module. Can be any type
106    ///           that implements `AsRef<str>` (e.g., `&str`, `String`, etc.)
107    ///
108    /// # Returns
109    ///
110    /// Returns `Ok(handle)` where `handle` is a numeric identifier for the buffer,
111    /// or an error if:
112    /// - The buffer name is not found in the module
113    /// - The streaming context handle is invalid
114    /// - The C library call fails
115    ///
116    /// # Examples
117    ///
118    /// ```rust,no_run
119    /// # use kunquant_rs::{Executor, Library, StreamContext, Result};
120    /// # fn example(mut stream: StreamContext) -> Result<()> {
121    /// // Get handle for input buffer
122    /// let input_handle = stream.get_buffer_handle("close")?;
123    ///
124    /// // Get handle for output buffer
125    /// let output_handle = stream.get_buffer_handle("factor_output")?;
126    ///
127    /// // Works with String as well
128    /// let buffer_name = String::from("volume");
129    /// let volume_handle = stream.get_buffer_handle(buffer_name)?;
130    /// # Ok(())
131    /// # }
132    /// ```
133    ///
134    /// # Performance Notes
135    ///
136    /// - Handles are cached internally after first lookup
137    /// - Subsequent calls for the same buffer name return cached values
138    /// - This optimization is crucial for high-frequency streaming applications
139    /// - The cache persists for the lifetime of the `StreamContext`
140    pub fn get_buffer_handle<N: AsRef<str>>(&mut self, name: N) -> Result<usize> {
141        let name_str = name.as_ref();
142
143        if let Some(&handle) = self.buffer_handles.get(name_str) {
144            return Ok(handle);
145        }
146
147        let c_name = CString::new(name_str)?;
148        let handle = unsafe { ffi::kunQueryBufferHandle(self.handle, c_name.as_ptr()) };
149
150        // Note: KunQuant returns SIZE_MAX for invalid buffer names
151        if handle == usize::MAX {
152            return Err(KunQuantError::BufferHandleNotFound {
153                name: name_str.to_string(),
154            });
155        }
156
157        self.buffer_handles.insert(name_str.to_string(), handle);
158        Ok(handle)
159    }
160
161    /// Retrieves the current computed data from a named output buffer.
162    ///
163    /// After calling `run()`, this method provides access to the computed factor values
164    /// for the current time step. The returned slice contains values for all stocks.
165    ///
166    /// # Arguments
167    ///
168    /// * `name` - The name of the output buffer as defined in the factor module
169    ///
170    /// # Returns
171    ///
172    /// Returns `Ok(&[f32])` containing the computed values for all stocks, or an error if:
173    /// - The buffer name is not found
174    /// - The computation hasn't been run yet (call `run()` first)
175    /// - The streaming context handle is invalid
176    /// - The C library returns a null pointer
177    ///
178    /// # Examples
179    ///
180    /// ```rust,no_run
181    /// # use kunquant_rs::{Executor, Library, StreamContext, Result};
182    /// # fn example(mut stream: StreamContext) -> Result<()> {
183    /// // Push input data
184    /// let prices = vec![100.0, 200.0, 150.0, 75.0, 300.0, 125.0, 90.0, 180.0];
185    /// stream.push_data("close", &prices)?;
186    ///
187    /// // Run computation
188    /// stream.run()?;
189    ///
190    /// // Get computed factor values
191    /// let factor_values = stream.get_current_buffer("my_factor")?;
192    /// println!("Factor values: {:?}", factor_values);
193    /// # Ok(())
194    /// # }
195    /// ```
196    ///
197    /// # Data Validity
198    ///
199    /// - The returned slice is valid until the next call to `push_data()` or `run()`
200    /// - Values may include NaN for stocks where computation is undefined
201    /// - The slice length always equals `num_stocks`
202    ///
203    /// # Safety Notes
204    ///
205    /// - The returned slice borrows from internal C buffers
206    /// - The lifetime is tied to the `StreamContext` instance
207    /// - Do not store references beyond the next streaming operation
208    pub fn get_current_buffer<N: AsRef<str>>(&mut self, name: N) -> Result<&[f32]> {
209        let handle = self.get_buffer_handle(name)?;
210        let ptr = unsafe { ffi::kunStreamGetCurrentBuffer(self.handle, handle) };
211
212        if ptr.is_null() {
213            return Err(KunQuantError::NullPointer);
214        }
215
216        Ok(unsafe { std::slice::from_raw_parts(ptr, self.num_stocks) })
217    }
218
219    /// Pushes new market data to a named input buffer for the current time step.
220    ///
221    /// This method feeds new data into the streaming computation pipeline. The data
222    /// represents values for all stocks at a single point in time (e.g., current prices,
223    /// volumes, etc.).
224    ///
225    /// # Arguments
226    ///
227    /// * `name` - The name of the input buffer as defined in the factor module
228    /// * `data` - Slice containing data for all stocks. Length must equal `num_stocks`
229    ///
230    /// # Returns
231    ///
232    /// Returns `Ok(())` on success, or an error if:
233    /// - The data length doesn't match the number of stocks
234    /// - The buffer name is not found
235    /// - The streaming context handle is invalid
236    /// - The C library call fails
237    ///
238    /// # Examples
239    ///
240    /// ```rust,no_run
241    /// # use kunquant_rs::{Executor, Library, StreamContext, Result};
242    /// # fn example(mut stream: StreamContext) -> Result<()> {
243    /// // Push closing prices for 8 stocks
244    /// let close_prices = vec![100.5, 200.3, 150.7, 75.2, 300.1, 125.8, 90.4, 180.6];
245    /// stream.push_data("close", &close_prices)?;
246    ///
247    /// // Push volume data
248    /// let volumes = vec![1000.0, 2000.0, 1500.0, 800.0, 3000.0, 1200.0, 900.0, 1800.0];
249    /// stream.push_data("volume", &volumes)?;
250    /// # Ok(())
251    /// # }
252    /// ```
253    ///
254    /// # Data Requirements
255    ///
256    /// - Data must be provided for exactly `num_stocks` securities
257    /// - Values should be finite floating-point numbers
258    /// - NaN and infinite values may cause computation errors
259    /// - Data represents a single time point across all stocks
260    ///
261    /// # Performance Notes
262    ///
263    /// - Data is copied into internal buffers managed by KunQuant
264    /// - Buffer handles are cached for optimal performance
265    /// - This method is designed for high-frequency updates
266    pub fn push_data<N: AsRef<str>>(&mut self, name: N, data: &[f32]) -> Result<()> {
267        if data.len() != self.num_stocks {
268            return Err(KunQuantError::BufferSizeMismatch {
269                name: name.as_ref().to_string(),
270                expected: self.num_stocks,
271                actual: data.len(),
272            });
273        }
274
275        let handle = self.get_buffer_handle(name)?;
276        unsafe {
277            ffi::kunStreamPushData(self.handle, handle, data.as_ptr());
278        }
279        Ok(())
280    }
281
282    /// Executes the factor computation on the currently pushed data.
283    ///
284    /// This method triggers the execution of the factor computation graph using all
285    /// input data that has been pushed since the last `run()` call. After successful
286    /// execution, computed results can be retrieved using `get_current_buffer()`.
287    ///
288    /// # Returns
289    ///
290    /// Returns `Ok(())` on successful computation, or an error if:
291    /// - The streaming context handle is invalid
292    /// - Required input data hasn't been pushed
293    /// - The computation encounters runtime errors
294    /// - The C library execution fails
295    ///
296    /// # Examples
297    ///
298    /// ```rust,no_run
299    /// # use kunquant_rs::{Executor, Library, StreamContext, Result};
300    /// # fn example(mut stream: StreamContext) -> Result<()> {
301    /// // Push all required input data
302    /// let close = vec![100.0, 200.0, 150.0, 75.0, 300.0, 125.0, 90.0, 180.0];
303    /// let open = vec![99.0, 199.0, 149.0, 74.0, 299.0, 124.0, 89.0, 179.0];
304    ///
305    /// stream.push_data("close", &close)?;
306    /// stream.push_data("open", &open)?;
307    ///
308    /// // Execute computation
309    /// stream.run()?;
310    ///
311    /// // Now results are available
312    /// let results = stream.get_current_buffer("output")?;
313    /// # Ok(())
314    /// # }
315    /// ```
316    ///
317    /// # Execution Model
318    ///
319    /// - Computation is performed synchronously
320    /// - All required inputs must be pushed before calling `run()`
321    /// - Results are immediately available after successful execution
322    /// - The method can be called repeatedly for streaming scenarios
323    ///
324    /// # Performance Notes
325    ///
326    /// - Optimized for low-latency execution
327    /// - Uses SIMD instructions when possible
328    /// - Memory buffers are reused between calls
329    /// - Execution time depends on factor complexity and number of stocks
330    pub fn run(&self) -> Result<()> {
331        if self.handle.is_null() {
332            return Err(KunQuantError::NullPointer);
333        }
334
335        unsafe {
336            ffi::kunStreamRun(self.handle);
337        }
338        Ok(())
339    }
340
341    /// Returns the number of stocks this streaming context is configured to process.
342    ///
343    /// This value is set during context creation and determines the expected length
344    /// of all input and output data arrays. It cannot be changed after creation.
345    ///
346    /// # Returns
347    ///
348    /// The number of stocks as specified when creating the streaming context.
349    ///
350    /// # Examples
351    ///
352    /// ```rust,no_run
353    /// # use kunquant_rs::{Executor, Library, StreamContext, Result};
354    /// # fn example(stream: StreamContext) -> Result<()> {
355    /// let num_stocks = stream.num_stocks();
356    /// println!("Processing {} stocks", num_stocks);
357    ///
358    /// // Ensure input data has correct length
359    /// let prices = vec![100.0; num_stocks];
360    /// // stream.push_data("close", &prices)?;
361    /// # Ok(())
362    /// # }
363    /// ```
364    ///
365    /// # Usage Notes
366    ///
367    /// - All input data arrays must have exactly this length
368    /// - All output data arrays will have exactly this length
369    /// - For optimal SIMD performance, this should be a multiple of 8
370    /// - The value is immutable for the lifetime of the context
371    pub fn num_stocks(&self) -> usize {
372        self.num_stocks
373    }
374}
375
376impl<'a> Drop for StreamContext<'a> {
377    fn drop(&mut self) {
378        if !self.handle.is_null() {
379            unsafe {
380                ffi::kunDestoryStream(self.handle);
381            }
382        }
383    }
384}