use clasp_core::{GesturePhase, PublishMessage, SignalType};
use dashmap::DashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tracing::debug;
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct GestureKey {
pub address: String,
pub gesture_id: u32,
}
impl GestureKey {
pub fn new(address: &str, gesture_id: u32) -> Self {
Self {
address: address.to_string(),
gesture_id,
}
}
}
#[derive(Debug, Clone)]
struct BufferedGesture {
pending_move: Option<PublishMessage>,
started_at: Instant,
last_move_at: Option<Instant>,
}
impl BufferedGesture {
fn new() -> Self {
Self {
pending_move: None,
started_at: Instant::now(),
last_move_at: None,
}
}
}
#[derive(Debug)]
pub enum GestureResult {
Forward(Vec<PublishMessage>),
Buffered,
PassThrough,
}
pub struct GestureRegistry {
gestures: DashMap<GestureKey, BufferedGesture>,
flush_interval: Duration,
}
impl GestureRegistry {
pub fn new(flush_interval: Duration) -> Self {
Self {
gestures: DashMap::new(),
flush_interval,
}
}
pub fn default() -> Self {
Self::new(Duration::from_millis(16))
}
pub fn process(&self, msg: &PublishMessage) -> GestureResult {
if msg.signal != Some(SignalType::Gesture) {
return GestureResult::PassThrough;
}
let phase = match msg.phase {
Some(p) => p,
None => return GestureResult::PassThrough,
};
let gesture_id = msg.id.unwrap_or(0);
let key = GestureKey::new(&msg.address, gesture_id);
match phase {
GesturePhase::Start => {
self.gestures.insert(key, BufferedGesture::new());
debug!("Gesture started: {}:{}", msg.address, gesture_id);
GestureResult::Forward(vec![msg.clone()])
}
GesturePhase::Move => {
if let Some(mut entry) = self.gestures.get_mut(&key) {
entry.pending_move = Some(msg.clone());
entry.last_move_at = Some(Instant::now());
GestureResult::Buffered
} else {
GestureResult::Forward(vec![msg.clone()])
}
}
GesturePhase::End | GesturePhase::Cancel => {
let mut to_forward = Vec::with_capacity(2);
if let Some((_, buffered)) = self.gestures.remove(&key) {
if let Some(pending) = buffered.pending_move {
to_forward.push(pending);
}
}
to_forward.push(msg.clone());
debug!("Gesture {:?}: {}:{}", phase, msg.address, gesture_id);
GestureResult::Forward(to_forward)
}
}
}
pub fn flush_stale(&self) -> Vec<PublishMessage> {
let now = Instant::now();
let mut to_forward = Vec::new();
for mut entry in self.gestures.iter_mut() {
if let Some(last_move) = entry.last_move_at {
if now.duration_since(last_move) >= self.flush_interval {
if let Some(pending) = entry.pending_move.take() {
to_forward.push(pending);
}
entry.last_move_at = None;
}
}
}
to_forward
}
pub fn active_count(&self) -> usize {
self.gestures.len()
}
pub fn cleanup_stale(&self, max_age: Duration) {
let now = Instant::now();
self.gestures
.retain(|_, v| now.duration_since(v.started_at) < max_age);
}
}
pub fn spawn_flush_task(
registry: Arc<GestureRegistry>,
flush_tx: mpsc::UnboundedSender<Vec<PublishMessage>>,
interval: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
loop {
ticker.tick().await;
let to_flush = registry.flush_stale();
if !to_flush.is_empty() {
if flush_tx.send(to_flush).is_err() {
break; }
}
registry.cleanup_stale(Duration::from_secs(300));
}
})
}
#[cfg(test)]
mod tests {
use super::*;
use clasp_core::Value;
use std::collections::HashMap;
fn make_gesture(address: &str, id: u32, phase: GesturePhase) -> PublishMessage {
PublishMessage {
address: address.to_string(),
signal: Some(SignalType::Gesture),
phase: Some(phase),
id: Some(id),
value: None,
payload: Some(Value::Map(Default::default())),
samples: None,
rate: None,
timestamp: None,
timeline: None,
}
}
fn make_gesture_with_payload(
address: &str,
id: u32,
phase: GesturePhase,
payload: Value,
) -> PublishMessage {
PublishMessage {
address: address.to_string(),
signal: Some(SignalType::Gesture),
phase: Some(phase),
id: Some(id),
value: None,
payload: Some(payload),
samples: None,
rate: None,
timestamp: None,
timeline: None,
}
}
#[test]
fn test_start_forwards_immediately() {
let registry = GestureRegistry::default();
let msg = make_gesture("/touch", 1, GesturePhase::Start);
match registry.process(&msg) {
GestureResult::Forward(msgs) => {
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].phase, Some(GesturePhase::Start));
assert_eq!(msgs[0].id, Some(1));
}
_ => panic!("Expected Forward"),
}
}
#[test]
fn test_move_gets_buffered() {
let registry = GestureRegistry::default();
let start = make_gesture("/touch", 1, GesturePhase::Start);
registry.process(&start);
let move1 = make_gesture("/touch", 1, GesturePhase::Move);
match registry.process(&move1) {
GestureResult::Buffered => {}
_ => panic!("Expected Buffered"),
}
assert_eq!(registry.active_count(), 1);
}
#[test]
fn test_move_replaces_previous_move() {
let registry = GestureRegistry::default();
registry.process(&make_gesture("/touch", 1, GesturePhase::Start));
let move1 = make_gesture_with_payload("/touch", 1, GesturePhase::Move, Value::Int(1));
registry.process(&move1);
let move2 = make_gesture_with_payload("/touch", 1, GesturePhase::Move, Value::Int(2));
registry.process(&move2);
let end = make_gesture("/touch", 1, GesturePhase::End);
match registry.process(&end) {
GestureResult::Forward(msgs) => {
assert_eq!(msgs.len(), 2);
if let Some(Value::Int(v)) = msgs[0].payload.as_ref().and_then(|p| match p {
Value::Int(i) => Some(Value::Int(*i)),
_ => None,
}) {
assert_eq!(v, 2);
} else {
panic!("Expected last move to have value 2");
}
assert_eq!(msgs[1].phase, Some(GesturePhase::End));
}
_ => panic!("Expected Forward with 2 messages"),
}
}
#[test]
fn test_end_flushes_buffered_move() {
let registry = GestureRegistry::default();
let start = make_gesture("/touch", 1, GesturePhase::Start);
registry.process(&start);
registry.process(&make_gesture("/touch", 1, GesturePhase::Move));
registry.process(&make_gesture("/touch", 1, GesturePhase::Move));
let end = make_gesture("/touch", 1, GesturePhase::End);
match registry.process(&end) {
GestureResult::Forward(msgs) => {
assert_eq!(msgs.len(), 2);
assert_eq!(msgs[0].phase, Some(GesturePhase::Move));
assert_eq!(msgs[1].phase, Some(GesturePhase::End));
}
_ => panic!("Expected Forward with 2 messages"),
}
assert_eq!(registry.active_count(), 0);
}
#[test]
fn test_end_without_move() {
let registry = GestureRegistry::default();
registry.process(&make_gesture("/touch", 1, GesturePhase::Start));
let end = make_gesture("/touch", 1, GesturePhase::End);
match registry.process(&end) {
GestureResult::Forward(msgs) => {
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].phase, Some(GesturePhase::End));
}
_ => panic!("Expected Forward with 1 message"),
}
}
#[test]
fn test_cancel_flushes_buffered_move() {
let registry = GestureRegistry::default();
let start = make_gesture("/touch", 1, GesturePhase::Start);
registry.process(&start);
registry.process(&make_gesture("/touch", 1, GesturePhase::Move));
let cancel = make_gesture("/touch", 1, GesturePhase::Cancel);
match registry.process(&cancel) {
GestureResult::Forward(msgs) => {
assert_eq!(msgs.len(), 2);
assert_eq!(msgs[0].phase, Some(GesturePhase::Move));
assert_eq!(msgs[1].phase, Some(GesturePhase::Cancel));
}
_ => panic!("Expected Forward with 2 messages"),
}
}
#[test]
fn test_multiple_gestures_independent() {
let registry = GestureRegistry::default();
registry.process(&make_gesture("/touch", 1, GesturePhase::Start));
registry.process(&make_gesture("/touch", 2, GesturePhase::Start));
registry.process(&make_gesture("/touch", 1, GesturePhase::Move));
registry.process(&make_gesture("/touch", 2, GesturePhase::Move));
match registry.process(&make_gesture("/touch", 1, GesturePhase::End)) {
GestureResult::Forward(msgs) => {
assert_eq!(msgs.len(), 2);
assert_eq!(msgs[0].id, Some(1));
assert_eq!(msgs[1].id, Some(1));
}
_ => panic!("Expected Forward"),
}
assert_eq!(registry.active_count(), 1);
}
#[test]
fn test_different_addresses_independent() {
let registry = GestureRegistry::default();
registry.process(&make_gesture("/touch1", 1, GesturePhase::Start));
registry.process(&make_gesture("/touch2", 1, GesturePhase::Start));
registry.process(&make_gesture("/touch1", 1, GesturePhase::Move));
registry.process(&make_gesture("/touch2", 1, GesturePhase::Move));
match registry.process(&make_gesture("/touch1", 1, GesturePhase::End)) {
GestureResult::Forward(msgs) => {
assert_eq!(msgs.len(), 2);
assert_eq!(msgs[0].address, "/touch1");
assert_eq!(msgs[1].address, "/touch1");
}
_ => panic!("Expected Forward"),
}
assert_eq!(registry.active_count(), 1);
}
#[test]
fn test_move_without_start() {
let registry = GestureRegistry::default();
let move_msg = make_gesture("/touch", 1, GesturePhase::Move);
match registry.process(&move_msg) {
GestureResult::Forward(msgs) => {
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].phase, Some(GesturePhase::Move));
}
_ => panic!("Expected Forward for late join"),
}
}
#[test]
fn test_rapid_start_end() {
let registry = GestureRegistry::default();
registry.process(&make_gesture("/touch", 1, GesturePhase::Start));
let end = make_gesture("/touch", 1, GesturePhase::End);
match registry.process(&end) {
GestureResult::Forward(msgs) => {
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].phase, Some(GesturePhase::End));
}
_ => panic!("Expected Forward"),
}
}
#[test]
fn test_concurrent_gestures_same_address() {
let registry = GestureRegistry::default();
for id in 1..=5 {
registry.process(&make_gesture("/multitouch", id, GesturePhase::Start));
registry.process(&make_gesture("/multitouch", id, GesturePhase::Move));
}
assert_eq!(registry.active_count(), 5);
for id in 1..=5 {
let end = make_gesture("/multitouch", id, GesturePhase::End);
match registry.process(&end) {
GestureResult::Forward(msgs) => {
assert_eq!(msgs.len(), 2); assert_eq!(msgs[0].id, Some(id));
assert_eq!(msgs[1].id, Some(id));
}
_ => panic!("Expected Forward"),
}
}
assert_eq!(registry.active_count(), 0);
}
#[test]
fn test_flush_stale() {
let registry = GestureRegistry::new(Duration::from_millis(1));
registry.process(&make_gesture("/touch", 1, GesturePhase::Start));
registry.process(&make_gesture("/touch", 1, GesturePhase::Move));
std::thread::sleep(Duration::from_millis(5));
let flushed = registry.flush_stale();
assert_eq!(flushed.len(), 1);
assert_eq!(flushed[0].phase, Some(GesturePhase::Move));
let flushed2 = registry.flush_stale();
assert!(flushed2.is_empty());
}
#[test]
fn test_flush_stale_multiple_gestures() {
let registry = GestureRegistry::new(Duration::from_millis(1));
for id in 1..=3 {
registry.process(&make_gesture("/touch", id, GesturePhase::Start));
registry.process(&make_gesture("/touch", id, GesturePhase::Move));
}
std::thread::sleep(Duration::from_millis(5));
let flushed = registry.flush_stale();
assert_eq!(flushed.len(), 3);
assert_eq!(registry.active_count(), 3);
}
#[test]
fn test_cleanup_stale() {
let registry = GestureRegistry::default();
registry.process(&make_gesture("/touch", 1, GesturePhase::Start));
assert_eq!(registry.active_count(), 1);
registry.cleanup_stale(Duration::from_secs(300));
assert_eq!(registry.active_count(), 1);
}
#[test]
fn test_non_gesture_passes_through() {
let registry = GestureRegistry::default();
let msg = PublishMessage {
address: "/test".to_string(),
signal: Some(SignalType::Event),
phase: None,
id: None,
value: Some(Value::Bool(true)),
payload: None,
samples: None,
rate: None,
timestamp: None,
timeline: None,
};
match registry.process(&msg) {
GestureResult::PassThrough => {}
_ => panic!("Expected PassThrough"),
}
}
#[test]
fn test_gesture_without_phase() {
let registry = GestureRegistry::default();
let msg = PublishMessage {
address: "/test".to_string(),
signal: Some(SignalType::Gesture),
phase: None, id: Some(1),
value: None,
payload: None,
samples: None,
rate: None,
timestamp: None,
timeline: None,
};
match registry.process(&msg) {
GestureResult::PassThrough => {}
_ => panic!("Expected PassThrough for gesture without phase"),
}
}
#[test]
fn test_gesture_without_id() {
let registry = GestureRegistry::default();
let msg = PublishMessage {
address: "/test".to_string(),
signal: Some(SignalType::Gesture),
phase: Some(GesturePhase::Start),
id: None, value: None,
payload: None,
samples: None,
rate: None,
timestamp: None,
timeline: None,
};
match registry.process(&msg) {
GestureResult::Forward(msgs) => {
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].id, None); }
_ => panic!("Expected Forward"),
}
}
#[test]
fn test_stress_many_gestures() {
let registry = GestureRegistry::default();
for id in 0..100 {
registry.process(&make_gesture("/stress", id, GesturePhase::Start));
registry.process(&make_gesture("/stress", id, GesturePhase::Move));
}
assert_eq!(registry.active_count(), 100);
for id in 0..100 {
registry.process(&make_gesture("/stress", id, GesturePhase::End));
}
assert_eq!(registry.active_count(), 0);
}
#[test]
fn test_rapid_move_updates() {
let registry = GestureRegistry::default();
registry.process(&make_gesture("/rapid", 1, GesturePhase::Start));
for i in 0..1000 {
let payload = Value::Map({
let mut m = HashMap::new();
m.insert("index".to_string(), Value::Int(i));
m
});
registry.process(&make_gesture_with_payload(
"/rapid",
1,
GesturePhase::Move,
payload,
));
}
let end = make_gesture("/rapid", 1, GesturePhase::End);
match registry.process(&end) {
GestureResult::Forward(msgs) => {
assert_eq!(msgs.len(), 2);
if let Some(Value::Map(map)) = msgs[0].payload.as_ref() {
if let Some(Value::Int(idx)) = map.get("index") {
assert_eq!(*idx, 999);
} else {
panic!("Expected index in payload");
}
} else {
panic!("Expected Map payload");
}
}
_ => panic!("Expected Forward"),
}
}
}