<div align="center">
# mocra
**A distributed, event-driven crawling and data collection framework for Rust.**
[](https://crates.io/crates/mocra)
[](https://docs.rs/mocra)
[](LICENSE)
[](https://www.rust-lang.org)
</div>
---
mocra is a Rust framework for building scalable data collection pipelines. It models crawling as **queue-driven processing stages** orchestrated by a **DAG execution engine**, with automatic scaling from single-process to distributed clusters.
## Features
- **DAG-based module system** — define linear chains or custom fan-out/fan-in graphs
- **Queue-driven pipeline** — task → request → download → parse → store, fully decoupled
- **Auto-scaling runtime** — single-node (in-memory) or distributed (Redis/Kafka) with zero code changes
- **Bounded concurrency** — semaphore-controlled worker pools with pause/resume/shutdown
- **Middleware pipeline** — download, data transformation, and storage middleware with weight-based ordering
- **Built-in control plane** — HTTP API for health, metrics, pause/resume, task injection, and DLQ inspection
- **Prometheus metrics** — unified `mocra_*` metric families for throughput, latency, errors, and backlog
- **Cron scheduling** — periodic task execution with cron expressions
- **Error recovery** — policy-driven retry, fallback gates, circuit breakers, and dead-letter queues
## Quick Start
Add to your `Cargo.toml`:
```toml
[dependencies]
mocra = "0.2"
async-trait = "0.1"
tokio = { version = "1", features = ["full"] }
serde_json = "1"
futures = "0.3"
```
Write a module:
```rust
use std::sync::Arc;
use async_trait::async_trait;
use futures::stream;
use serde_json::{Map, Value};
use mocra::prelude::*;
use mocra::common::model::login_info::LoginInfo;
use mocra::common::state::State;
use mocra::engine::engine::Engine;
struct FetchNode;
#[async_trait]
impl ModuleNodeTrait for FetchNode {
async fn generate(
&self,
_config: Arc<ModuleConfig>,
_params: Map<String, Value>,
_login_info: Option<LoginInfo>,
) -> Result<SyncBoxStream<'static, Request>> {
let req = Request::new("https://httpbin.org/get", RequestMethod::Get.as_ref());
Ok(Box::pin(stream::iter(vec![req])))
}
async fn parser(
&self, response: Response, _config: Option<Arc<ModuleConfig>>,
) -> Result<TaskOutputEvent> {
println!("Status: {}, Body: {} bytes", response.status, response.body.len());
Ok(TaskOutputEvent::default())
}
}
struct MyModule;
#[async_trait]
impl ModuleTrait for MyModule {
fn name(&self) -> String { "my_module".into() }
fn version(&self) -> i32 { 1 }
fn should_login(&self) -> bool { false }
fn default_arc() -> Arc<dyn ModuleTrait> { Arc::new(Self) }
async fn add_step(&self) -> Vec<Arc<dyn ModuleNodeTrait>> {
vec![Arc::new(FetchNode)]
}
}
#[tokio::main]
async fn main() {
let state = Arc::new(State::new("config.toml").await);
let engine = Engine::new(state, None).await;
engine.register_module(MyModule::default_arc()).await;
engine.run().await;
}
```
Create `config.toml`:
```toml
name = "my_crawler"
[db]
url = "sqlite://data/crawler.db?mode=rwc"
[cache]
ttl = 60
[download_config]
timeout = 30
rate_limit = 10
[crawler]
request_max_retries = 3
[channel_config]
capacity = 5000
```
Run:
```bash
cargo run
```
## Architecture
```
┌─────────────────────────────────────────────────────────┐
│ Engine │
│ │
│ TaskEvent ──▶ generate() ──▶ download() ──▶ parser() │
│ │ │ │ │ │
│ [Task Q] [Request Q] [Response Q] [Parser Q] │
│ │ │
│ ┌────────────┬──────┘ │
│ ▼ ▼ │
│ [Data Store] [Next Node] │
│ [Error Q → DLQ] │
└─────────────────────────────────────────────────────────┘
```
Each stage is decoupled by a message queue. Queues are local Tokio channels in single-node mode, or Redis Streams / Kafka in distributed mode — **same code, zero changes**.
## DAG Execution
Define complex pipelines with fan-out and fan-in:
```rust
async fn dag_definition(&self) -> Option<ModuleDagDefinition> {
Some(ModuleDagDefinition {
nodes: vec![
ModuleDagNodeDef { node_id: "start".into(), node: Arc::new(StartNode), ..Default::default() },
ModuleDagNodeDef { node_id: "branch_a".into(), node: Arc::new(BranchA), ..Default::default() },
ModuleDagNodeDef { node_id: "branch_b".into(), node: Arc::new(BranchB), ..Default::default() },
ModuleDagNodeDef { node_id: "merge".into(), node: Arc::new(MergeNode), ..Default::default() },
],
edges: vec![
ModuleDagEdgeDef { from: "start".into(), to: "branch_a".into() },
ModuleDagEdgeDef { from: "start".into(), to: "branch_b".into() },
ModuleDagEdgeDef { from: "branch_a".into(), to: "merge".into() },
ModuleDagEdgeDef { from: "branch_b".into(), to: "merge".into() },
],
entry_nodes: vec!["start".into()],
..Default::default()
})
}
```
```
┌── branch_a ──┐
start ─┤ ├── merge
└── branch_b ──┘
```
## Single-Node vs Distributed
| **Config** | No `cache.redis` | Add `cache.redis` |
| **Queues** | Tokio mpsc (in-memory) | Redis Streams or Kafka |
| **Cache** | In-memory | Redis |
| **Locks** | Local mutex | Redis distributed locks |
| **Workers** | 1 process | N processes, same binary |
| **Code changes** | None | None |
Switch to distributed by adding Redis to your config:
```toml
[cache.redis]
url = "redis://localhost:6379"
```
## Documentation
| [Getting Started](docs/getting-started.md) | Installation and first module |
| [Architecture](docs/architecture.md) | System design and pipeline internals |
| [Module Development](docs/module-development.md) | ModuleTrait, ModuleNodeTrait, data passing |
| [DAG Guide](docs/dag-guide.md) | DAG definition, fan-out/fan-in, advance gates |
| [Middleware](docs/middleware-guide.md) | Download, data, and storage middleware |
| [Configuration](docs/configuration.md) | Full TOML configuration reference |
| [API Reference](docs/api-reference.md) | HTTP control plane endpoints |
| [Deployment](docs/deployment.md) | Single-node, distributed, monitoring |
## Examples
See the [`simple/`](simple/) directory for runnable examples:
- [`simple/module_node_trait_dag.rs`](simple/module_node_trait_dag.rs) — Fan-out/fan-in DAG module
## Monitoring
```bash
# Start Prometheus + Grafana
docker compose -f docker-compose.monitoring.yml up -d
# Prometheus: http://localhost:9090
# Grafana: http://localhost:3000
# Metrics: http://localhost:8080/metrics
```
## License
Licensed under either of:
- MIT license
- Apache License, Version 2.0
at your option.