#![doc = include_str!("../README.md")]
use of_core::{AnalyticsSnapshot, DataQualityFlags, SignalSnapshot, SignalState};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SignalGateDecision {
Pass,
Block,
}
pub trait SignalModule: Send + Sync {
fn on_analytics(&mut self, ev: &AnalyticsSnapshot);
fn snapshot(&self) -> SignalSnapshot;
fn quality_gate(&self, q: DataQualityFlags) -> SignalGateDecision;
}
#[derive(Debug)]
pub struct DeltaMomentumSignal {
latest: AnalyticsSnapshot,
threshold: i64,
}
impl DeltaMomentumSignal {
pub fn new(threshold: i64) -> Self {
Self {
latest: AnalyticsSnapshot::default(),
threshold,
}
}
}
impl Default for DeltaMomentumSignal {
fn default() -> Self {
Self::new(100)
}
}
impl SignalModule for DeltaMomentumSignal {
fn on_analytics(&mut self, ev: &AnalyticsSnapshot) {
self.latest = ev.clone();
}
fn snapshot(&self) -> SignalSnapshot {
let (state, reason) = if self.latest.delta >= self.threshold {
(SignalState::LongBias, "delta_above_threshold")
} else if self.latest.delta <= -self.threshold {
(SignalState::ShortBias, "delta_below_threshold")
} else {
(SignalState::Neutral, "delta_inside_band")
};
SignalSnapshot {
module_id: "delta_momentum_v1",
state,
confidence_bps: 500,
quality_flags: 0,
reason: reason.to_string(),
}
}
fn quality_gate(&self, q: DataQualityFlags) -> SignalGateDecision {
default_quality_gate(q)
}
}
#[derive(Debug)]
pub struct VolumeImbalanceSignal {
latest: AnalyticsSnapshot,
threshold: i64,
}
impl VolumeImbalanceSignal {
pub fn new(threshold: i64) -> Self {
Self {
latest: AnalyticsSnapshot::default(),
threshold,
}
}
}
impl Default for VolumeImbalanceSignal {
fn default() -> Self {
Self::new(100)
}
}
impl SignalModule for VolumeImbalanceSignal {
fn on_analytics(&mut self, ev: &AnalyticsSnapshot) {
self.latest = ev.clone();
}
fn snapshot(&self) -> SignalSnapshot {
let imbalance = self.latest.buy_volume - self.latest.sell_volume;
let (state, reason) = if imbalance >= self.threshold {
(SignalState::LongBias, "buy_volume_above_threshold")
} else if imbalance <= -self.threshold {
(SignalState::ShortBias, "sell_volume_above_threshold")
} else {
(SignalState::Neutral, "volume_inside_band")
};
SignalSnapshot {
module_id: "volume_imbalance_v1",
state,
confidence_bps: 550,
quality_flags: 0,
reason: reason.to_string(),
}
}
fn quality_gate(&self, q: DataQualityFlags) -> SignalGateDecision {
default_quality_gate(q)
}
}
#[derive(Debug)]
pub struct CumulativeDeltaSignal {
latest: AnalyticsSnapshot,
threshold: i64,
}
impl CumulativeDeltaSignal {
pub fn new(threshold: i64) -> Self {
Self {
latest: AnalyticsSnapshot::default(),
threshold,
}
}
}
impl Default for CumulativeDeltaSignal {
fn default() -> Self {
Self::new(250)
}
}
impl SignalModule for CumulativeDeltaSignal {
fn on_analytics(&mut self, ev: &AnalyticsSnapshot) {
self.latest = ev.clone();
}
fn snapshot(&self) -> SignalSnapshot {
let (state, reason) = if self.latest.cumulative_delta >= self.threshold {
(SignalState::LongBias, "cumulative_delta_above_threshold")
} else if self.latest.cumulative_delta <= -self.threshold {
(SignalState::ShortBias, "cumulative_delta_below_threshold")
} else {
(SignalState::Neutral, "cumulative_delta_inside_band")
};
SignalSnapshot {
module_id: "cumulative_delta_v1",
state,
confidence_bps: 600,
quality_flags: 0,
reason: reason.to_string(),
}
}
fn quality_gate(&self, q: DataQualityFlags) -> SignalGateDecision {
default_quality_gate(q)
}
}
#[derive(Debug)]
pub struct AbsorptionSignal {
latest: AnalyticsSnapshot,
threshold: i64,
price_band: i64,
}
impl AbsorptionSignal {
pub fn new(threshold: i64, price_band: i64) -> Self {
Self {
latest: AnalyticsSnapshot::default(),
threshold,
price_band,
}
}
}
impl Default for AbsorptionSignal {
fn default() -> Self {
Self::new(150, 2)
}
}
impl SignalModule for AbsorptionSignal {
fn on_analytics(&mut self, ev: &AnalyticsSnapshot) {
self.latest = ev.clone();
}
fn snapshot(&self) -> SignalSnapshot {
let poc_distance = (self.latest.last_price - self.latest.point_of_control).abs();
let (state, reason) = if poc_distance <= self.price_band && self.latest.delta <= -self.threshold {
(SignalState::LongBias, "sell_absorption_detected")
} else if poc_distance <= self.price_band && self.latest.delta >= self.threshold {
(SignalState::ShortBias, "buy_absorption_detected")
} else {
(SignalState::Neutral, "absorption_not_detected")
};
SignalSnapshot {
module_id: "absorption_v1",
state,
confidence_bps: 575,
quality_flags: 0,
reason: reason.to_string(),
}
}
fn quality_gate(&self, q: DataQualityFlags) -> SignalGateDecision {
default_quality_gate(q)
}
}
#[derive(Debug)]
pub struct ExhaustionSignal {
latest: AnalyticsSnapshot,
threshold: i64,
}
impl ExhaustionSignal {
pub fn new(threshold: i64) -> Self {
Self {
latest: AnalyticsSnapshot::default(),
threshold,
}
}
}
impl Default for ExhaustionSignal {
fn default() -> Self {
Self::new(150)
}
}
impl SignalModule for ExhaustionSignal {
fn on_analytics(&mut self, ev: &AnalyticsSnapshot) {
self.latest = ev.clone();
}
fn snapshot(&self) -> SignalSnapshot {
let (state, reason) = if self.latest.delta >= self.threshold
&& self.latest.last_price <= self.latest.point_of_control
{
(SignalState::ShortBias, "buy_exhaustion_detected")
} else if self.latest.delta <= -self.threshold
&& self.latest.last_price >= self.latest.point_of_control
{
(SignalState::LongBias, "sell_exhaustion_detected")
} else {
(SignalState::Neutral, "exhaustion_not_detected")
};
SignalSnapshot {
module_id: "exhaustion_v1",
state,
confidence_bps: 565,
quality_flags: 0,
reason: reason.to_string(),
}
}
fn quality_gate(&self, q: DataQualityFlags) -> SignalGateDecision {
default_quality_gate(q)
}
}
#[derive(Debug)]
pub struct SweepDetectionSignal {
latest: AnalyticsSnapshot,
threshold: i64,
breakout_ticks: i64,
}
impl SweepDetectionSignal {
pub fn new(threshold: i64, breakout_ticks: i64) -> Self {
Self {
latest: AnalyticsSnapshot::default(),
threshold,
breakout_ticks,
}
}
}
impl Default for SweepDetectionSignal {
fn default() -> Self {
Self::new(150, 1)
}
}
impl SignalModule for SweepDetectionSignal {
fn on_analytics(&mut self, ev: &AnalyticsSnapshot) {
self.latest = ev.clone();
}
fn snapshot(&self) -> SignalSnapshot {
let (state, reason) = if self.latest.delta >= self.threshold
&& self.latest.last_price >= self.latest.value_area_high + self.breakout_ticks
{
(SignalState::LongBias, "upside_sweep_detected")
} else if self.latest.delta <= -self.threshold
&& self.latest.last_price <= self.latest.value_area_low - self.breakout_ticks
{
(SignalState::ShortBias, "downside_sweep_detected")
} else {
(SignalState::Neutral, "sweep_not_detected")
};
SignalSnapshot {
module_id: "sweep_detection_v1",
state,
confidence_bps: 625,
quality_flags: 0,
reason: reason.to_string(),
}
}
fn quality_gate(&self, q: DataQualityFlags) -> SignalGateDecision {
default_quality_gate(q)
}
}
pub struct CompositeSignal {
modules: Vec<Box<dyn SignalModule>>,
}
impl CompositeSignal {
pub fn new(modules: Vec<Box<dyn SignalModule>>) -> Self {
Self { modules }
}
}
impl Default for CompositeSignal {
fn default() -> Self {
Self::new(vec![
Box::new(DeltaMomentumSignal::default()),
Box::new(VolumeImbalanceSignal::default()),
Box::new(CumulativeDeltaSignal::default()),
])
}
}
impl SignalModule for CompositeSignal {
fn on_analytics(&mut self, ev: &AnalyticsSnapshot) {
for module in &mut self.modules {
module.on_analytics(ev);
}
}
fn snapshot(&self) -> SignalSnapshot {
if self.modules.is_empty() {
return SignalSnapshot {
module_id: "composite_v1",
state: SignalState::Neutral,
confidence_bps: 0,
quality_flags: 0,
reason: "no_child_modules".to_string(),
};
}
let mut long_votes = 0_u16;
let mut short_votes = 0_u16;
let mut confidence_sum = 0_u32;
let mut long_modules = Vec::new();
let mut short_modules = Vec::new();
for module in &self.modules {
let snapshot = module.snapshot();
confidence_sum += snapshot.confidence_bps as u32;
match snapshot.state {
SignalState::LongBias => {
long_votes += 1;
long_modules.push(snapshot.module_id);
}
SignalState::ShortBias => {
short_votes += 1;
short_modules.push(snapshot.module_id);
}
SignalState::Neutral | SignalState::Blocked => {}
}
}
let (state, reason) = if long_votes > short_votes && long_votes > 0 {
(
SignalState::LongBias,
format!("composite_long:{}", long_modules.join(",")),
)
} else if short_votes > long_votes && short_votes > 0 {
(
SignalState::ShortBias,
format!("composite_short:{}", short_modules.join(",")),
)
} else {
(SignalState::Neutral, "composite_no_majority".to_string())
};
SignalSnapshot {
module_id: "composite_v1",
state,
confidence_bps: (confidence_sum / self.modules.len() as u32) as u16,
quality_flags: 0,
reason,
}
}
fn quality_gate(&self, q: DataQualityFlags) -> SignalGateDecision {
if self
.modules
.iter()
.any(|module| module.quality_gate(q) == SignalGateDecision::Block)
{
SignalGateDecision::Block
} else {
SignalGateDecision::Pass
}
}
}
fn default_quality_gate(q: DataQualityFlags) -> SignalGateDecision {
if q.intersects(
DataQualityFlags::STALE_FEED
| DataQualityFlags::SEQUENCE_GAP
| DataQualityFlags::OUT_OF_ORDER
| DataQualityFlags::ADAPTER_DEGRADED,
) {
SignalGateDecision::Block
} else {
SignalGateDecision::Pass
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn blocks_on_quality_issues() {
let s = DeltaMomentumSignal::default();
let decision = s.quality_gate(DataQualityFlags::SEQUENCE_GAP);
assert_eq!(decision, SignalGateDecision::Block);
}
#[test]
fn volume_imbalance_signal_uses_session_totals() {
let mut s = VolumeImbalanceSignal::new(10);
s.on_analytics(&AnalyticsSnapshot {
buy_volume: 30,
sell_volume: 15,
..Default::default()
});
let snapshot = s.snapshot();
assert_eq!(snapshot.module_id, "volume_imbalance_v1");
assert_eq!(snapshot.state, SignalState::LongBias);
}
#[test]
fn cumulative_delta_signal_uses_cumulative_threshold() {
let mut s = CumulativeDeltaSignal::new(20);
s.on_analytics(&AnalyticsSnapshot {
cumulative_delta: -25,
..Default::default()
});
let snapshot = s.snapshot();
assert_eq!(snapshot.module_id, "cumulative_delta_v1");
assert_eq!(snapshot.state, SignalState::ShortBias);
}
#[test]
fn absorption_signal_detects_failed_sell_push() {
let mut s = AbsorptionSignal::new(20, 1);
s.on_analytics(&AnalyticsSnapshot {
delta: -25,
last_price: 100,
point_of_control: 100,
..Default::default()
});
let snapshot = s.snapshot();
assert_eq!(snapshot.module_id, "absorption_v1");
assert_eq!(snapshot.state, SignalState::LongBias);
}
#[test]
fn exhaustion_signal_detects_failed_buy_follow_through() {
let mut s = ExhaustionSignal::new(20);
s.on_analytics(&AnalyticsSnapshot {
delta: 25,
last_price: 100,
point_of_control: 101,
..Default::default()
});
let snapshot = s.snapshot();
assert_eq!(snapshot.module_id, "exhaustion_v1");
assert_eq!(snapshot.state, SignalState::ShortBias);
}
#[test]
fn sweep_signal_detects_value_area_break() {
let mut s = SweepDetectionSignal::new(20, 1);
s.on_analytics(&AnalyticsSnapshot {
delta: 30,
last_price: 106,
value_area_high: 104,
..Default::default()
});
let snapshot = s.snapshot();
assert_eq!(snapshot.module_id, "sweep_detection_v1");
assert_eq!(snapshot.state, SignalState::LongBias);
}
#[test]
fn composite_signal_aggregates_child_votes() {
let mut s = CompositeSignal::new(vec![
Box::new(DeltaMomentumSignal::new(10)),
Box::new(VolumeImbalanceSignal::new(10)),
Box::new(CumulativeDeltaSignal::new(10)),
]);
s.on_analytics(&AnalyticsSnapshot {
delta: 15,
cumulative_delta: 20,
buy_volume: 30,
sell_volume: 10,
..Default::default()
});
let snapshot = s.snapshot();
assert_eq!(snapshot.module_id, "composite_v1");
assert_eq!(snapshot.state, SignalState::LongBias);
}
}