rustkernel_risk/
ring_messages.rs

1//! Ring message types for Risk Analytics kernels.
2//!
3//! This module defines zero-copy Ring messages for GPU-native persistent actors.
4//! Type IDs 600-699 are reserved for Risk Analytics domain.
5//!
6//! ## Type ID Allocation
7//!
8//! - 600-619: Monte Carlo VaR messages
9//! - 620-639: Portfolio risk aggregation messages
10//! - 640-659: Credit risk messages
11//! - 660-679: K2K streaming coordination messages
12
13use ringkernel_derive::RingMessage;
14use rkyv::{Archive, Deserialize, Serialize};
15use rustkernel_core::messages::MessageId;
16
17// ============================================================================
18// Monte Carlo VaR Ring Messages (600-619)
19// ============================================================================
20
21/// Update position for streaming VaR calculation.
22#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
23#[archive(check_bytes)]
24#[message(type_id = 600)]
25pub struct UpdatePositionRing {
26    /// Message ID.
27    pub id: MessageId,
28    /// Asset ID.
29    pub asset_id: u64,
30    /// New position value (fixed-point: value * 100 for 2 decimal places).
31    pub value_fp: i64,
32    /// Expected return (fixed-point: value * 100_000_000).
33    pub expected_return_fp: i64,
34    /// Volatility (fixed-point: value * 100_000_000).
35    pub volatility_fp: i64,
36}
37
38/// Position update response.
39#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
40#[archive(check_bytes)]
41#[message(type_id = 601)]
42pub struct UpdatePositionResponse {
43    /// Original message ID.
44    pub request_id: u64,
45    /// Asset ID updated.
46    pub asset_id: u64,
47    /// Whether VaR needs recalculation.
48    pub var_stale: bool,
49}
50
51/// Query current VaR value.
52#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
53#[archive(check_bytes)]
54#[message(type_id = 602)]
55pub struct QueryVaRRing {
56    /// Message ID.
57    pub id: MessageId,
58    /// Confidence level (fixed-point: value * 100_000_000, e.g., 0.95 = 95_000_000).
59    pub confidence_fp: i64,
60    /// Holding period in days.
61    pub holding_period: u32,
62}
63
64/// VaR query response.
65#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
66#[archive(check_bytes)]
67#[message(type_id = 603)]
68pub struct QueryVaRResponse {
69    /// Original message ID.
70    pub request_id: u64,
71    /// Value at Risk (fixed-point: value * 100).
72    pub var_fp: i64,
73    /// Expected Shortfall (fixed-point: value * 100).
74    pub es_fp: i64,
75    /// Confidence level.
76    pub confidence_fp: i64,
77    /// Holding period.
78    pub holding_period: u32,
79    /// Whether this is a fresh calculation.
80    pub is_fresh: bool,
81}
82
83/// Trigger VaR recalculation.
84#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
85#[archive(check_bytes)]
86#[message(type_id = 604)]
87pub struct RecalculateVaRRing {
88    /// Message ID.
89    pub id: MessageId,
90    /// Number of simulations.
91    pub n_simulations: u32,
92    /// Confidence level (fixed-point).
93    pub confidence_fp: i64,
94    /// Holding period.
95    pub holding_period: u32,
96}
97
98/// VaR recalculation response.
99#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
100#[archive(check_bytes)]
101#[message(type_id = 605)]
102pub struct RecalculateVaRResponse {
103    /// Original message ID.
104    pub request_id: u64,
105    /// New VaR value (fixed-point).
106    pub var_fp: i64,
107    /// New ES value (fixed-point).
108    pub es_fp: i64,
109    /// Computation time in microseconds.
110    pub compute_time_us: u64,
111    /// Number of simulations used.
112    pub n_simulations: u32,
113}
114
115// ============================================================================
116// K2K Streaming Coordination Messages (660-679)
117// ============================================================================
118
119/// K2K position batch update for distributed VaR.
120///
121/// Used when positions are partitioned across multiple workers.
122#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
123#[archive(check_bytes)]
124#[message(type_id = 660)]
125pub struct K2KPositionBatch {
126    /// Message ID.
127    pub id: MessageId,
128    /// Source worker ID.
129    pub source_worker: u64,
130    /// Batch sequence number.
131    pub batch_seq: u64,
132    /// Number of positions in batch.
133    pub position_count: u32,
134    /// Packed asset IDs (up to 8).
135    pub asset_ids: [u64; 8],
136    /// Packed values (fixed-point, up to 8).
137    pub values_fp: [i64; 8],
138}
139
140/// K2K partial VaR result from a worker.
141#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
142#[archive(check_bytes)]
143#[message(type_id = 661)]
144pub struct K2KPartialVaR {
145    /// Message ID.
146    pub id: MessageId,
147    /// Worker ID.
148    pub worker_id: u64,
149    /// Correlation ID for the calculation request.
150    pub correlation_id: u64,
151    /// Partial VaR contribution (fixed-point).
152    pub partial_var_fp: i64,
153    /// Partial ES contribution (fixed-point).
154    pub partial_es_fp: i64,
155    /// Number of positions processed.
156    pub positions_processed: u32,
157    /// Covariance contribution term (fixed-point).
158    pub cov_contribution_fp: i64,
159}
160
161/// K2K VaR aggregation request.
162///
163/// Sent to aggregator to combine partial VaR results.
164#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
165#[archive(check_bytes)]
166#[message(type_id = 662)]
167pub struct K2KVaRAggregation {
168    /// Message ID.
169    pub id: MessageId,
170    /// Correlation ID.
171    pub correlation_id: u64,
172    /// Number of workers expected.
173    pub expected_workers: u32,
174    /// Workers that have reported.
175    pub workers_reported: u32,
176    /// Aggregated VaR so far (fixed-point).
177    pub aggregated_var_fp: i64,
178}
179
180/// K2K VaR aggregation response.
181#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
182#[archive(check_bytes)]
183#[message(type_id = 663)]
184pub struct K2KVaRAggregationResponse {
185    /// Original correlation ID.
186    pub correlation_id: u64,
187    /// All workers reported.
188    pub complete: bool,
189    /// Final aggregated VaR (fixed-point).
190    pub final_var_fp: i64,
191    /// Final aggregated ES (fixed-point).
192    pub final_es_fp: i64,
193    /// Diversification benefit (fixed-point).
194    pub diversification_benefit_fp: i64,
195}
196
197/// K2K streaming market data update.
198///
199/// Broadcasts market data updates to all VaR workers.
200#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
201#[archive(check_bytes)]
202#[message(type_id = 664)]
203pub struct K2KMarketUpdate {
204    /// Message ID.
205    pub id: MessageId,
206    /// Update timestamp (microseconds since epoch).
207    pub timestamp_us: u64,
208    /// Asset ID.
209    pub asset_id: u64,
210    /// New price (fixed-point: value * 100).
211    pub price_fp: i64,
212    /// Implied volatility change (fixed-point: delta * 100_000_000).
213    pub vol_delta_fp: i64,
214}
215
216/// K2K market update acknowledgment.
217#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
218#[archive(check_bytes)]
219#[message(type_id = 665)]
220pub struct K2KMarketUpdateAck {
221    /// Original message ID.
222    pub request_id: u64,
223    /// Worker ID acknowledging.
224    pub worker_id: u64,
225    /// Updated VaR impact estimate (fixed-point).
226    pub var_impact_fp: i64,
227}
228
229/// K2K risk limit breach alert.
230///
231/// Sent when a position update causes VaR to breach limits.
232#[derive(Debug, Clone, Archive, Serialize, Deserialize, RingMessage)]
233#[archive(check_bytes)]
234#[message(type_id = 666)]
235pub struct K2KRiskLimitAlert {
236    /// Message ID.
237    pub id: MessageId,
238    /// Alert timestamp (microseconds since epoch).
239    pub timestamp_us: u64,
240    /// Alert severity: 1=warning, 2=breach, 3=critical.
241    pub severity: u8,
242    /// Current VaR (fixed-point).
243    pub current_var_fp: i64,
244    /// VaR limit (fixed-point).
245    pub var_limit_fp: i64,
246    /// Breach amount (fixed-point).
247    pub breach_amount_fp: i64,
248    /// Triggering asset ID.
249    pub trigger_asset_id: u64,
250}
251
252// ============================================================================
253// Helper Functions
254// ============================================================================
255
256/// Convert f64 to fixed-point i64 (8 decimal places).
257#[inline]
258pub fn to_fixed_point(value: f64) -> i64 {
259    (value * 100_000_000.0) as i64
260}
261
262/// Convert fixed-point i64 to f64.
263#[inline]
264pub fn from_fixed_point(fp: i64) -> f64 {
265    fp as f64 / 100_000_000.0
266}
267
268/// Convert value to fixed-point with 2 decimal places (for currency).
269#[inline]
270pub fn to_currency_fp(value: f64) -> i64 {
271    (value * 100.0) as i64
272}
273
274/// Convert currency fixed-point to f64.
275#[inline]
276pub fn from_currency_fp(fp: i64) -> f64 {
277    fp as f64 / 100.0
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283
284    #[test]
285    fn test_fixed_point_conversion() {
286        let value = 0.95;
287        let fp = to_fixed_point(value);
288        let back = from_fixed_point(fp);
289        assert!((value - back).abs() < 1e-8);
290    }
291
292    #[test]
293    fn test_currency_conversion() {
294        // Use a value that can be exactly represented
295        let value = 50000.50;
296        let fp = to_currency_fp(value);
297        let back = from_currency_fp(fp);
298        assert!((value - back).abs() < 0.01);
299        assert_eq!(fp, 5000050); // 50000.50 * 100 = 5000050
300    }
301
302    #[test]
303    fn test_update_position_ring() {
304        let msg = UpdatePositionRing {
305            id: MessageId(1),
306            asset_id: 100,
307            value_fp: to_currency_fp(50000.0),
308            expected_return_fp: to_fixed_point(0.08),
309            volatility_fp: to_fixed_point(0.20),
310        };
311        assert_eq!(msg.asset_id, 100);
312        assert!((from_currency_fp(msg.value_fp) - 50000.0).abs() < 0.01);
313    }
314
315    #[test]
316    fn test_k2k_partial_var() {
317        let msg = K2KPartialVaR {
318            id: MessageId(2),
319            worker_id: 1,
320            correlation_id: 12345,
321            partial_var_fp: to_currency_fp(10000.0),
322            partial_es_fp: to_currency_fp(12000.0),
323            positions_processed: 50,
324            cov_contribution_fp: to_fixed_point(0.0015),
325        };
326        assert_eq!(msg.worker_id, 1);
327        assert_eq!(msg.positions_processed, 50);
328    }
329
330    #[test]
331    fn test_k2k_risk_limit_alert() {
332        let msg = K2KRiskLimitAlert {
333            id: MessageId(3),
334            timestamp_us: 1234567890,
335            severity: 2,
336            current_var_fp: to_currency_fp(1_100_000.0),
337            var_limit_fp: to_currency_fp(1_000_000.0),
338            breach_amount_fp: to_currency_fp(100_000.0),
339            trigger_asset_id: 42,
340        };
341        assert_eq!(msg.severity, 2);
342        assert!((from_currency_fp(msg.breach_amount_fp) - 100_000.0).abs() < 0.01);
343    }
344}