use super::error::{ErrorSeverity, SoftError, ValidationError};
use super::infer::{LookaheadContext, TypeInference};
use crate::core::{SharedUniverse, TypeId};
use crate::query::QueryEngine;
use parking_lot::RwLock;
use tokio::sync::{broadcast, mpsc};
#[derive(Debug, Clone)]
pub enum ValidationEvent {
Token {
text: String,
position: SourcePosition,
},
Expression {
expr: Expression,
position: SourcePosition,
},
TypeAnnotation {
expr_id: ExpressionId,
inferred_type: TypeId,
confidence: f32,
},
Validated {
expr_id: ExpressionId,
result: ValidationResult,
},
Error {
expr_id: ExpressionId,
error: ValidationError,
},
Checkpoint { id: CheckpointId },
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
pub struct SourcePosition {
pub line: u32,
pub column: u32,
pub offset: u32,
}
impl SourcePosition {
pub fn new(line: u32, column: u32, offset: u32) -> Self {
Self {
line,
column,
offset,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ExpressionId(u64);
impl ExpressionId {
pub fn new(id: u64) -> Self {
Self(id)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct CheckpointId(u64);
impl CheckpointId {
pub fn new(id: u64) -> Self {
Self(id)
}
}
#[derive(Debug, Clone)]
pub enum Expression {
Identifier(String),
Literal(LiteralValue),
Binary {
op: BinaryOp,
left: Box<Expression>,
right: Box<Expression>,
},
Unary {
op: UnaryOp,
operand: Box<Expression>,
},
Call {
func: Box<Expression>,
args: Vec<Expression>,
},
Selector {
base: Box<Expression>,
field: String,
},
Index {
base: Box<Expression>,
index: Box<Expression>,
},
TypeAssertion {
expr: Box<Expression>,
typ: TypeId,
},
Composite {
typ: TypeId,
elements: Vec<Expression>,
},
}
#[derive(Debug, Clone)]
pub enum LiteralValue {
Int(i64),
Float(f64),
String(String),
Bool(bool),
Nil,
}
#[derive(Debug, Clone, Copy)]
pub enum BinaryOp {
Add,
Sub,
Mul,
Div,
Rem,
And,
Or,
Eq,
Ne,
Lt,
Le,
Gt,
Ge,
BitAnd,
BitOr,
BitXor,
Shl,
Shr,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum UnaryOp {
Not,
Neg,
Pos,
BitNot,
Deref,
Addr,
Recv, }
#[derive(Debug, Clone)]
pub enum ValidationResult {
Valid {
typ: TypeId,
},
Invalid {
errors: Vec<ValidationError>,
},
Partial {
typ: Option<TypeId>,
issues: Vec<SoftError>,
},
Unknown,
}
pub struct ValidationStream {
universe: SharedUniverse,
query_engine: QueryEngine,
inference: TypeInference,
input_tx: mpsc::Sender<ValidationEvent>,
output_rx: broadcast::Receiver<StreamOutput>,
expressions: RwLock<im::HashMap<ExpressionId, ExpressionState>>,
checkpoints: RwLock<im::HashMap<CheckpointId, CheckpointState>>,
expr_counter: std::sync::atomic::AtomicU64,
checkpoint_counter: std::sync::atomic::AtomicU64,
}
#[derive(Debug, Clone)]
struct ExpressionState {
expr: Expression,
position: SourcePosition,
inferred_type: Option<TypeId>,
validation_result: Option<ValidationResult>,
}
#[derive(Debug, Clone)]
struct CheckpointState {
expressions: im::HashMap<ExpressionId, ExpressionState>,
scope: crate::core::symbol::Scope,
}
#[derive(Debug, Clone)]
pub struct StreamOutput {
pub event: ValidationEvent,
pub timestamp: std::time::Instant,
pub latency_us: u64,
}
impl ValidationStream {
pub fn new(universe: SharedUniverse) -> Self {
let (input_tx, mut input_rx) = mpsc::channel(1024);
let (output_tx, output_rx) = broadcast::channel(1024);
let query_engine = QueryEngine::new(universe.clone());
let inference = TypeInference::new(universe.clone());
let stream = Self {
universe: universe.clone(),
query_engine,
inference,
input_tx,
output_rx,
expressions: RwLock::new(im::HashMap::new()),
checkpoints: RwLock::new(im::HashMap::new()),
expr_counter: std::sync::atomic::AtomicU64::new(1),
checkpoint_counter: std::sync::atomic::AtomicU64::new(1),
};
let stream_clone = stream.clone_ref();
tokio::spawn(async move {
while let Some(event) = input_rx.recv().await {
let start = std::time::Instant::now();
stream_clone.process_event(event.clone()).await;
let latency = start.elapsed().as_micros() as u64;
let _ = output_tx.send(StreamOutput {
event,
timestamp: start,
latency_us: latency,
});
}
});
stream
}
fn clone_ref(&self) -> Self {
Self {
universe: self.universe.clone(),
query_engine: QueryEngine::new(self.universe.clone()),
inference: TypeInference::new(self.universe.clone()),
input_tx: self.input_tx.clone(),
output_rx: self.output_rx.resubscribe(),
expressions: RwLock::new(self.expressions.read().clone()),
checkpoints: RwLock::new(self.checkpoints.read().clone()),
expr_counter: std::sync::atomic::AtomicU64::new(
self.expr_counter.load(std::sync::atomic::Ordering::SeqCst),
),
checkpoint_counter: std::sync::atomic::AtomicU64::new(
self.checkpoint_counter
.load(std::sync::atomic::Ordering::SeqCst),
),
}
}
pub async fn submit(
&self,
event: ValidationEvent,
) -> Result<(), mpsc::error::SendError<ValidationEvent>> {
self.input_tx.send(event).await
}
pub fn subscribe(&self) -> broadcast::Receiver<StreamOutput> {
self.output_rx.resubscribe()
}
async fn process_event(&self, event: ValidationEvent) {
match event {
ValidationEvent::Token { text, position } => {
self.process_token(&text, position).await;
}
ValidationEvent::Expression { expr, position } => {
self.process_expression(expr, position).await;
}
ValidationEvent::Checkpoint { id } => {
self.create_checkpoint(id).await;
}
_ => {}
}
}
async fn process_token(&self, _text: &str, _position: SourcePosition) {
}
async fn process_expression(&self, expr: Expression, position: SourcePosition) {
let id = self.next_expr_id();
let inferred = self.inference.infer(&expr, &LookaheadContext::default());
let result = self.validate_expression(&expr, inferred.as_ref());
let state = ExpressionState {
expr: expr.clone(),
position,
inferred_type: inferred,
validation_result: Some(result.clone()),
};
{
let mut expressions = self.expressions.write();
*expressions = expressions.update(id, state);
}
}
fn validate_expression(
&self,
expr: &Expression,
expected_type: Option<&TypeId>,
) -> ValidationResult {
match expr {
Expression::Identifier(name) => self.validate_identifier(name, expected_type),
Expression::Binary { op, left, right } => {
self.validate_binary_op(*op, left, right, expected_type)
}
Expression::Call { func, args } => self.validate_call(func, args, expected_type),
Expression::Selector { base, field } => {
self.validate_selector(base, field, expected_type)
}
_ => ValidationResult::Unknown,
}
}
fn validate_identifier(&self, name: &str, expected: Option<&TypeId>) -> ValidationResult {
ValidationResult::Partial {
typ: expected.copied(),
issues: vec![SoftError {
message: format!("Unresolved identifier: {}", name),
suggestion: None,
severity: ErrorSeverity::Hint,
}],
}
}
fn validate_binary_op(
&self,
op: BinaryOp,
left: &Expression,
right: &Expression,
expected: Option<&TypeId>,
) -> ValidationResult {
let left_result = self.validate_expression(left, None);
let right_result = self.validate_expression(right, None);
match (left_result, right_result) {
(
ValidationResult::Valid { typ: left_type },
ValidationResult::Valid { typ: right_type },
) => {
if self.types_compatible_for_op(op, left_type, right_type) {
ValidationResult::Valid {
typ: self.result_type_for_op(op, left_type, right_type),
}
} else {
ValidationResult::Invalid {
errors: vec![ValidationError::TypeMismatch {
expected: left_type,
found: right_type,
}],
}
}
}
(ValidationResult::Partial { .. }, _) | (_, ValidationResult::Partial { .. }) => {
ValidationResult::Partial {
typ: expected.copied(),
issues: vec![],
}
}
_ => ValidationResult::Unknown,
}
}
fn types_compatible_for_op(&self, op: BinaryOp, left: TypeId, right: TypeId) -> bool {
left == right || self.is_numeric_op(op)
}
fn is_numeric_op(&self, op: BinaryOp) -> bool {
matches!(
op,
BinaryOp::Add | BinaryOp::Sub | BinaryOp::Mul | BinaryOp::Div | BinaryOp::Rem
)
}
fn result_type_for_op(&self, _op: BinaryOp, left: TypeId, _right: TypeId) -> TypeId {
left }
fn validate_call(
&self,
func: &Expression,
args: &[Expression],
expected: Option<&TypeId>,
) -> ValidationResult {
let _func_result = self.validate_expression(func, None);
for arg in args {
let _ = self.validate_expression(arg, None);
}
ValidationResult::Partial {
typ: expected.copied(),
issues: vec![],
}
}
fn validate_selector(
&self,
base: &Expression,
field: &str,
expected: Option<&TypeId>,
) -> ValidationResult {
let base_result = self.validate_expression(base, None);
match base_result {
ValidationResult::Valid { typ: _ } => {
ValidationResult::Partial {
typ: expected.copied(),
issues: vec![SoftError {
message: format!("Field lookup: .{}", field),
suggestion: None,
severity: ErrorSeverity::Hint,
}],
}
}
_ => base_result,
}
}
async fn create_checkpoint(&self, id: CheckpointId) {
let expressions = self.expressions.read().clone();
let scope = self.universe.current_scope();
let checkpoint = CheckpointState { expressions, scope };
let mut checkpoints = self.checkpoints.write();
*checkpoints = checkpoints.update(id, checkpoint);
}
pub fn rollback(&self, checkpoint_id: CheckpointId) -> bool {
if let Some(checkpoint) = self.checkpoints.read().get(&checkpoint_id).cloned() {
*self.expressions.write() = checkpoint.expressions.clone();
true
} else {
false
}
}
fn next_expr_id(&self) -> ExpressionId {
ExpressionId::new(
self.expr_counter
.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
)
}
pub async fn checkpoint(&self) -> CheckpointId {
let id = CheckpointId::new(
self.checkpoint_counter
.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
);
let _: Result<(), _> = self.submit(ValidationEvent::Checkpoint { id }).await;
id
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::TypeUniverse;
use std::sync::Arc;
#[tokio::test]
async fn test_stream_creation() {
let universe = Arc::new(TypeUniverse::new());
let stream = ValidationStream::new(universe);
let expr = Expression::Identifier("x".to_string());
stream
.submit(ValidationEvent::Expression {
expr,
position: SourcePosition::default(),
})
.await
.unwrap();
}
}