Cortex Flow

A high-performance, asynchronous data processing pipeline library in Rust. Implemented to be used in RAG applications providing ready to use OpenAI compatible APIs, especially OpenRouter.ai.
Features
- ๐ Async design for high throughput
- ๐ Flexible flow composition with branching
- ๐ง Modular and extensible architecture
- ๐ Built-in feedback mechanism
- ๐ก๏ธ Type-safe data processing
- ๐ Performance benchmarking
- ๐งช Comprehensive testing and documentation
- ๐ Example processors, sources, conditions and best practices
- ๐ Safe Rust
Installation
Add this to your Cargo.toml
:
[dependencies]
cortex-ai = "0.1.0"
Quick Start
use cortex_ai::{Flow, Source, Processor, Condition};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let flow = Flow::new()
.source(MySource)
.process(MyProcessor)
.when(MyCondition)
.process(ThenProcessor)
.otherwise()
.process(ElseProcessor)
.end();
flow.run_stream(shutdown_rx).await?;
Ok(())
}
Usage Examples
Simple Processing Flow
let flow = Flow::new()
.source(DataSource::new())
.process(DataProcessor::new())
.run_stream(shutdown_rx)
.await?;
Conditional Branching
let flow = Flow::new()
.source(DataSource::new())
.when(ValidationCondition::new())
.process(ValidDataProcessor::new())
.otherwise()
.process(InvalidDataProcessor::new())
.end();
Performance
Benchmark results for different flow configurations:
- Simple Flow: ~1M messages/sec
- Branching Flow: ~800K messages/sec
- Complex Flow: ~500K messages/sec
- Concurrent Flow: ~700K messages/sec
Run benchmarks with:
just bench
Development
Prerequisites
- Rust 1.80 or higher
- Cargo
- just
Building
just build
Testing
just test
Formatting & Linting
just check
Running the sample project
just sample
Project Layout
cortex-ai/
โโโ src/
โ โโโ composer/ # Flow composition
โ โโโ flow/ # Core flow components
โ โโโ lib.rs # Public API
โโโ tests/ # unit tests
โโโ benches/ # performance benchmarks
cortex-processors/
โโโ src/
โ โโโ processors/ # Processor implementations
โ โโโ lib.rs # Public API
โโโ tests/ # unit tests
โโโ benches/ # performance benchmarks
cortex-sources/
โโโ src/
โ โโโ sources/ # Source implementations
โ โโโ lib.rs # Public API
โโโ tests/ # unit tests
โโโ benches/ # performance benchmarks
cortex-conditions/
โโโ src/
โ โโโ conditions/ # Condition implementations
โ โโโ lib.rs # Public API
โโโ tests/ # unit tests
โโโ benches/ # performance benchmarks
Implementing a New Processor
To add a new processor to your flow, implement the Processor
trait:
use cortex_ai::{FlowComponent, Processor, FlowFuture};
#[derive(Clone)]
struct MultiplicationProcessor {
multiplier: i32,
}
impl FlowComponent for MultiplicationProcessor {
type Input = i32; type Output = i32; type Error = MyError; }
impl Processor for MultiplicationProcessor {
fn process(&self, input: Self::Input) -> FlowFuture<', Self::Output, Self::Error> {
let multiplier = self.multiplier;
Box::pin(async move {
Ok(input * multiplier)
})
}
}
let flow = Flow::new()
.source(MySource::new())
.process(MultiplicationProcessor { multiplier: 2 })
.process(AnotherProcessor::new());
Best Practices
1. Make your processor `Clone` if possible
2. Keep processing functions pure and stateless
3. Use appropriate error types
4. Document input/output requirements
5. Add unit tests for your processor
Example: String Processor
#[derive(Clone)]
struct StringTransformer;
impl FlowComponent for StringTransformer {
type Input = String;
type Output = String;
type Error = ProcessError;
}
impl Processor for StringTransformer {
fn process(&self, input: Self::Input) -> FlowFuture<', Self::Output, Self::Error> {
Box::pin(async move {
Ok(input.to_uppercase())
})
}
}
#[cfg(test)]
mod tests {
#[tokio::test]
async fn test_string_transformer() {
let processor = StringTransformer;
let result = processor.process("hello".to_string()).await;
assert_eq!(result.unwrap(), "HELLO");
}
}
Implementing a New Source
To create a new data source for your flow, implement the Source
trait:
use cortex_ai::{FlowComponent, Source, FlowFuture};
#[derive(Clone)]
struct MySource {
}
impl FlowComponent for MySource {
type Input = MyInput; type Output = MyOutput; type Error = MyError; }
impl Source for MySource {
fn source(&self) -> FlowFuture<', Self::Output, Self::Error> {
Box::pin(async move {
Ok(MyOutput)
})
}
}
let flow = Flow::new()
.source(MySource::new())
.process(MyProcessor)
.run_stream(shutdown_rx)
.await?;
Example: Kafka Source
use cortex_ai::{FlowComponent, Source, FlowFuture, SourceOutput};
use flume::bounded;
#[derive(Clone)]
struct KafkaSource {
topic: String,
broker: String,
}
impl FlowComponent for KafkaSource {
type Input = (); type Output = String; type Error = SourceError; }
impl Source for KafkaSource {
fn stream(&self) -> FlowFuture<', SourceOutput<Self::Output, Self::Error>, Self::Error> {
let topic = self.topic.clone();
let broker = self.broker.clone();
Box::pin(async move {
let (tx, rx) = bounded(1000);
let (feedback_tx, feedback_rx) = bounded(1000);
tokio::spawn(async move {
while let Ok(result) = feedback_rx.recv_async().await {
match result {
Ok(data) => {
println!("Message processed: {}", data);
}
Err(e) => {
println!("Processing failed: {}", e);
}
}
}
});
tokio::spawn(async move {
loop {
let msg = "sample message".to_string();
if tx.send(Ok(msg)).is_err() {
break; }
}
});
Ok(SourceOutput {
receiver: rx,
feedback: feedback_tx,
})
})
}
}
let source = KafkaSource {
topic: "my-topic".to_string(),
broker: "localhost:9092".to_string(),
};
let flow = Flow::new()
.source(source)
.process(MyProcessor::new());
Best Practices for Sources
1. Feedback Handling
- Always process feedback to track message status
- Implement proper commit/acknowledgment logic
- Handle errors appropriately
2. Channel Sizing
- Choose appropriate buffer sizes
- Consider backpressure mechanisms
- Monitor channel capacity
3. Error Handling
- Use specific error types
- Provide context in errors
- Consider retry strategies
4. Resource Management
- Clean up resources when channel closes
- Handle shutdown gracefully
- Monitor resource usage
Implementing a New Condition
Here's an example of implementing a condition that checks if a key exists in Redis:
use cortex_ai::{FlowComponent, Condition, ConditionFuture};
use redis::AsyncCommands;
use std::sync::Arc;
#[derive(Clone)]
struct RedisExistsCondition {
client: Arc<redis::Client>,
prefix: String,
}
impl RedisExistsCondition {
pub fn new(redis_url: &str, prefix: &str) -> redis::RedisResult<Self> {
Ok(Self {
client: Arc::new(redis::Client::open(redis_url)?),
prefix: prefix.to_string(),
})
}
}
#[derive(Debug, thiserror::Error)]
pub enum RedisConditionError {
#[error("Redis error: {0}")]
Redis(#[from] redis::RedisError),
#[error("Invalid input: {0}")]
InvalidInput(String),
}
impl FlowComponent for RedisExistsCondition {
type Input = String; type Output = String; type Error = RedisConditionError; }
impl Condition for RedisExistsCondition {
fn evaluate(&self, input: Self::Input) -> ConditionFuture<', Self::Output, Self::Error> {
let client = self.client.clone();
let key = format!("{}:{}", self.prefix, input);
Box::pin(async move {
let mut conn = client.get_async_connection().await?;
let exists: bool = conn.exists(&key).await?;
Ok((exists, Some(input)))
})
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let condition = RedisExistsCondition::new(
"redis://localhost:6379",
"myapp:cache"
)?;
let flow = Flow::new()
.source(MessageSource::new())
.when(condition)
.process(CacheHitProcessor::new())
.otherwise()
.process(CacheMissProcessor::new())
.end();
flow.run_stream(shutdown_rx).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_redis_condition() {
let condition = RedisExistsCondition::new(
"redis://localhost:6379",
"test"
).unwrap();
let mut conn = condition.client.get_async_connection().await.unwrap();
conn.set("test:key1", "value1").await.unwrap();
let result = condition.evaluate("key1".to_string()).await.unwrap();
assert!(result.0); assert_eq!(result.1, Some("key1".to_string()));
let result = condition.evaluate("key2".to_string()).await.unwrap();
assert!(!result.0); assert_eq!(result.1, Some("key2".to_string()));
}
}
Best Practices for Conditions
-
Error Handling:
- Use custom error types with thiserror
- Provide meaningful error messages
- Handle all possible failure cases
-
Resource Management:
- Share connections using Arc when possible
- Handle connection failures gracefully
- Clean up resources properly
-
Testing:
- Test both positive and negative cases
- Test error conditions
- Use test fixtures or mocks for external services
-
Performance:
- Cache connections when possible
- Use connection pools for databases
- Consider timeout settings
Example: Complex Condition
#[derive(Clone)]
struct CombinedCondition<T> {
conditions: Vec<T>,
require_all: bool,
}
impl<T> CombinedCondition<T>
where
T: Condition + Clone,
T::Input: Clone,
T::Output: Clone,
T::Error: From<&'static str>,
{
fn new(conditions: Vec<T>, require_all: bool) -> Self {
Self {
conditions,
require_all,
}
}
}
impl<T> FlowComponent for CombinedCondition<T>
where
T: Condition + Clone,
T::Input: Clone,
T::Output: Clone,
T::Error: From<&'static str>,
{
type Input = T::Input;
type Output = T::Output;
type Error = T::Error;
}
impl<T> Condition for CombinedCondition<T>
where
T: Condition + Clone,
T::Input: Clone,
T::Output: Clone,
T::Error: From<&'static str>,
{
fn evaluate(&self, input: Self::Input) -> ConditionFuture<', Self::Output, Self::Error> {
let conditions = self.conditions.clone();
let require_all = self.require_all;
let input_clone = input.clone();
Box::pin(async move {
let mut results = Vec::new();
for condition in conditions {
let (result, _ ) = condition.evaluate(input_clone.clone()).await?;
results.push(result);
}
let final_result = if require_all {
results.iter().all(|&r| r)
} else {
results.iter().any(|&r| r)
};
Ok((final_result, Some(input)))
})
}
}
Using Combined Conditions
Here's how to use multiple conditions together:
use cortex_ai::{Flow, FlowComponent, Condition, ConditionFuture};
#[derive(Clone)]
struct RedisExistsCondition {
}
#[derive(Clone)]
struct RateLimitCondition {
max_requests: u32,
window_secs: u64,
}
impl FlowComponent for RateLimitCondition {
type Input = String;
type Output = String;
type Error = RedisConditionError;
}
impl Condition for RateLimitCondition {
fn evaluate(&self, input: Self::Input) -> ConditionFuture<', Self::Output, Self::Error> {
Box::pin(async move {
Ok((true, Some(input)))
})
}
}
#[derive(Clone)]
struct WhitelistCondition {
allowed_prefixes: Vec<String>,
}
impl FlowComponent for WhitelistCondition {
type Input = String;
type Output = String;
type Error = RedisConditionError;
}
impl Condition for WhitelistCondition {
fn evaluate(&self, input: Self::Input) -> ConditionFuture<', Self::Output, Self::Error> {
let allowed = self.allowed_prefixes.iter()
.any(|prefix| input.starts_with(prefix));
Box::pin(async move {
Ok((allowed, Some(input)))
})
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let redis_condition = RedisExistsCondition::new(
"redis://localhost:6379",
"myapp:cache",
)?;
let rate_limit = RateLimitCondition {
max_requests: 100,
window_secs: 60,
};
let whitelist = WhitelistCondition {
allowed_prefixes: vec!["user_".to_string(), "admin_".to_string()],
};
let strict_condition = CombinedCondition::new(
vec![
redis_condition,
rate_limit,
whitelist,
],
true, );
let relaxed_condition = CombinedCondition::new(
vec![
redis_condition,
rate_limit,
whitelist,
],
false, );
let strict_flow = Flow::new()
.source(MessageSource::new())
.when(strict_condition)
.process(AllChecksPassedProcessor::new())
.otherwise()
.process(SomeCheckFailedProcessor::new())
.end();
let relaxed_flow = Flow::new()
.source(MessageSource::new())
.when(relaxed_condition)
.process(AnyCheckPassedProcessor::new())
.otherwise()
.process(AllChecksFailedProcessor::new())
.end();
tokio::join!(
strict_flow.run_stream(strict_shutdown_rx),
relaxed_flow.run_stream(relaxed_shutdown_rx),
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_combined_conditions() {
let redis_condition = RedisExistsCondition::new(
"redis://localhost:6379",
"test",
).unwrap();
let rate_limit = RateLimitCondition {
max_requests: 100,
window_secs: 60,
};
let whitelist = WhitelistCondition {
allowed_prefixes: vec!["user_".to_string()],
};
let strict = CombinedCondition::new(
vec![redis_condition.clone(), rate_limit.clone(), whitelist.clone()],
true,
);
let relaxed = CombinedCondition::new(
vec![redis_condition, rate_limit, whitelist],
false,
);
let mut conn = redis::Client::open("redis://localhost:6379")
.unwrap()
.get_async_connection()
.await
.unwrap();
conn.set("test:user_123", "value").await.unwrap();
let result = strict.evaluate("user_123".to_string()).await.unwrap();
assert!(result.0); let result = strict.evaluate("invalid_456".to_string()).await.unwrap();
assert!(!result.0); let result = relaxed.evaluate("invalid_456".to_string()).await.unwrap();
assert!(result.0); }
}
Common Use Cases for Combined Conditions
-
Access Control:
let access_check = CombinedCondition::new(vec![
AuthTokenCondition::new(),
RoleCondition::new(),
PermissionCondition::new(),
], true);
-
Data Validation:
let validation = CombinedCondition::new(vec![
SchemaValidation::new(),
BusinessRuleValidation::new(),
DataIntegrityCheck::new(),
], true);
-
Routing Logic:
let routing = CombinedCondition::new(vec![
CapacityCheck::new(),
LoadBalancingCheck::new(),
HealthCheck::new(),
], false);
Contributing
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
Please make sure to update tests as appropriate.
Guidelines
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature
)
- Commit your changes (
git commit -m 'Add some amazing feature'
)
- Push to the branch (
git push origin feature/amazing-feature
)
- Open a Pull Request
License
This project is licensed under the Apache 2.0 License - see the LICENSE file for details.
Acknowledgments
- Inspired by modern data processing pipelines
- Built with Rust's async ecosystem
- Performance-focused design
Contact