drasi-middleware 0.2.5

Drasi Core Middleware
# Drasi Middleware Development Guide

## Introduction

Middleware components are pluggable modules that process incoming changes before they are processed by a a continuous query. Middlewares can transform, enrich, filter, or route source-changes and they can be chained together in a pipeline.

Learn more about [middlewares in Drasi here](https://drasi.io/concepts/middleware/).

This guide explains how to develop custom middleware in Drasi core.

## Core Concepts

### SourceMiddleware and SourceMiddlewareFactory

A middleware in Drasi consists of two main components:

1. **SourceMiddleware**: Implements the processing logic to handle and transform `SourceChange` objects
2. **SourceMiddlewareFactory**: Creates instances of middleware configured with specific parameters

## Getting Started

To create a new middleware, you need to:

1. Create a new Rust module under the `middleware` directory
2. Implement both the `SourceMiddleware` trait and a factory that implements the `SourceMiddlewareFactory` trait
3. Write unit tests for your middleware

## Step-by-Step Guide

### 1. Create a Middleware Module

Create a new directory under the `middleware/src` path with a descriptive name for your middleware:
```
middleware/src/my_middleware/
├── mod.rs
└── tests.rs
```

### 2. Implement the SourceMiddleware Trait

In your `mod.rs` file, implement the `SourceMiddleware` trait:

```rust
use std::sync::Arc;
use async_trait::async_trait;
use drasi_core::{
    interface::{
        ElementIndex,
        MiddlewareError,
        MiddlewareSetupError,
        SourceMiddleware,
        SourceMiddlewareFactory
    },
    models::{Element, SourceChange, SourceMiddlewareConfig},
};

pub struct MyMiddleware {
    // Configuration fields for your middleware
}

#[async_trait]
impl SourceMiddleware for MyMiddleware {
    async fn process(
        &self,
        source_change: SourceChange,
        element_index: &dyn ElementIndex,
    ) -> Result<Vec<SourceChange>, MiddlewareError> {
        // Process the source change and return transformed changes
        match source_change {
            SourceChange::Insert { mut element } => {
                // Transform the element being inserted
                Ok(vec![SourceChange::Insert { element }])
            },
            SourceChange::Update { mut element } => {
                // Transform the element being updated
                Ok(vec![SourceChange::Update { element }])
            },
            SourceChange::Delete { metadata } => {
                // Handle deletion
                Ok(vec![SourceChange::Delete { metadata }])
            },
            SourceChange::Future { .. } => {
                // Pass through future events, or transform if needed
                Ok(vec![source_change])
            },
        }
    }
}
```

### 3. Define Configuration Structure
Create a configuration structure for your middleware:

```rust
use serde::Deserialize;

#[derive(Debug, Clone, Deserialize)]
pub struct MyMiddlewareConfig {
    // Configuration fields with appropriate types
    pub field_one: String,
    #[serde(default)]
    pub optional_field: bool,
}
```

### 4. Implement the SourceMiddlewareFactory
Create a factory that knows how to instantiate your middleware:
```rust
pub struct MyMiddlewareFactory {}

impl MyMiddlewareFactory {
    pub fn new() -> Self {
        MyMiddlewareFactory {}
    }
}

impl Default for MyMiddlewareFactory {
    fn default() -> Self {
        Self::new()
    }
}

impl SourceMiddlewareFactory for MyMiddlewareFactory {
    fn name(&self) -> String {
        "my_middleware".to_string()
    }

    fn create(
        &self,
        config: &SourceMiddlewareConfig,
    ) -> Result<Arc<dyn SourceMiddleware>, MiddlewareSetupError> {
        // Parse configuration
        let my_config: MyMiddlewareConfig = match serde_json::from_value(
            serde_json::Value::Object(config.config.clone())
        ) {
            Ok(cfg) => cfg,
            Err(e) => {
                return Err(MiddlewareSetupError::InvalidConfiguration(
                    format!("Invalid configuration: {}", e)
                ))
            }
        };

        // Validate configuration
        if my_config.field_one.is_empty() {
            return Err(MiddlewareSetupError::InvalidConfiguration(
                "field_one cannot be empty".to_string(),
            ));
        }

        // Create middleware instance
        Ok(Arc::new(MyMiddleware {
            // Initialize with parsed config
        }))
    }
}
```

### 5. Write Tests
In your tests.rs file, create unit tests for your middleware:

```rust
#[cfg(test)]
mod tests {
    use std::sync::Arc;
    use drasi_core::{
        in_memory_index::in_memory_element_index::InMemoryElementIndex,
        interface::SourceMiddlewareFactory,
        models::{
            Element, ElementMetadata, ElementReference, SourceChange,
            SourceMiddlewareConfig, ElementValue,
        },
    };
    use serde_json::json;
    use super::*;

    #[tokio::test]
    async fn test_my_middleware_basic() {
        let factory = MyMiddlewareFactory::new();
        let config = json!({
            "field_one": "test_value",
            "optional_field": true
        });

        let element_index = Arc::new(InMemoryElementIndex::new());
        let mw_config = SourceMiddlewareConfig {
            name: "test".into(),
            kind: "my_middleware".into(),
            config: config.as_object().unwrap().clone(),
        };

        let middleware = factory.create(&mw_config).unwrap();

        // Create a test SourceChange
        let source_change = SourceChange::Insert {
            element: Element::Node {
                metadata: ElementMetadata {
                    reference: ElementReference::new("test", "node1"),
                    labels: vec!["TestNode".into()].into(),
                    effective_from: 0,
                },
                properties: json!({
                    "test_property": "test_value"
                }).into(),
            },
        };

        // Process the source change
        let result = middleware.process(source_change, element_index.as_ref()).await.unwrap();
        
        // Assert expected behavior
        assert_eq!(result.len(), 1);
        // Add specific assertions based on your middleware's expected behavior
    }
    
    #[tokio::test]
    async fn test_invalid_config() {
        let factory = MyMiddlewareFactory::new();
        let config = json!({}); // Missing required fields
        
        let mw_config = SourceMiddlewareConfig {
            name: "test".into(),
            kind: "my_middleware".into(),
            config: config.as_object().unwrap().clone(),
        };
        
        // Should return an error for invalid config
        assert!(factory.create(&mw_config).is_err());
    }
}
```

## Best Practices

### Error Handling
- Use descriptive error messages in MiddlewareSetupError returns
- Log errors appropriately with information that helps debugging
- Validate configuration before creating middleware instances

```rust
if config_value_invalid {
    log::warn!("Invalid config value: {}", config_value);
    return Err(MiddlewareSetupError::InvalidConfiguration("Detailed error message".to_string()));
}
```

## Configuration Design
- Make your middleware configurable with reasonable defaults
- Use #[serde(default)] for optional fields
- Consider namespacing configuration fields (e.g., with prefixes)
- Document the configuration options clearly

# Example Middlewares

## Map Middleware
The Map middleware transforms data using JSONPath expressions for more advanced mapping.

Key features:
- Selects data using JSONPath expressions
- Maps input properties to output properties
- Supports different mapping operations (insert, update, delete)

## Unwind Middleware
The Unwind middleware flattens arrays into individual elements.

Key features:
- Extracts array elements into separate nodes
- Creates relationships between parent and extracted elements
- Configurable with JSONPath selectors

## Decoder Middleware
The Decoder middleware decodes encoded strings.

Key features:
- Supports decoding from various formats - base64, hex, etc.
- Supports stripping quotes from encoded strings
- Configurable output property prefix