recoco 0.1.0

ReCoco is an all-Rust fork of CocoIndex with greater flexibility.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
<!--
SPDX-FileCopyrightText: 2026 Knitli Inc. (ReCoco)
SPDX-FileContributor: Adam Poulemanos <adam@knit.li>

SPDX-License-Identifier: Apache-2.0
-->

<div align="center">
  <img src="assets/recoco.webp" alt="ReCoco Logo" width="200"/>

  # ReCoco

  **Incremental ETL and Data Processing Framework for Rust**

  [![Crates.io](https://img.shields.io/crates/v/recoco.svg)](https://crates.io/crates/recoco)
  [![Documentation](https://docs.rs/recoco/badge.svg)](https://docs.rs/recoco)
  [![CI](https://github.com/knitli/recoco/actions/workflows/ci.yml/badge.svg)](https://github.com/knitli/recoco/actions/workflows/ci.yml)
  [![MSRV](https://img.shields.io/badge/MSRV-1.89-blue.svg)](https://blog.rust-lang.org/2025/02/20/Rust-1.89.0.html)
  [![License](https://img.shields.io/badge/license-Apache--2.0-blue.svg)](LICENSE)
  [![REUSE Compliance](https://img.shields.io/badge/reuse-compliant-brightgreen)](https://reuse.software/)

</div>

---

**[ReCoco](https://github.com/knitli/recoco)** is a pure Rust fork of the excellent [CocoIndex](https://github.com/cocoindex-io/cocoindex), a high-performance, incremental ETL and data processing framework.

## πŸ“‘ Table of Contents

- [Why Fork?]#why-fork
- [How ReCoco is Different]#how-recoco-is-different
- [Key Features]#-key-features
- [Use Cases]#-use-cases
- [Installation]#installation
- [Quick Start]#quick-start
- [Examples]#examples
- [Architecture]#️-architecture
- [Development]#️-development
- [Contributing]#-contributing
- [Relationship to CocoIndex]#-relationship-to-cocoindex
- [License]#-license

## Why Fork?

I decided to create a Rust-only fork of CocoIndex for a couple reasons:

1. **CocoIndex is not a Rust library.**  CocoIndex is written in Rust, but it does not expose a Rust API and its packaging, documentation, and examples are only focused on *Python*. It exposes a more limited API through its Rust extensions. It's not even released on crates.io.

2. **CocoIndex is heavy.** CocoIndex has several very heavy dependencies that you probably don't need all of, including Google/AWS/Azure components, Qdrant/Postgres/Neo4j, and more. 

For [Knitli](https://knitli.com), I particularly needed dependency control. I wanted to use CocoIndex as an ETL engine for [Thread](https://github.com/knitli/thread/), but Thread needs to be edge-deployable. The dependencies were way too heavy and would never compile to WASM. Thread, of course, is also a Rust project, so pulling in a lot of Python dependencies didn't make sense for me.

> [!NOTE] Knitli and ReCoco have no official relationship with CocoIndex and this project is not endorsed by them. **We will contribute as much as we can upstream**, our [contribution guidelines]CONTRIBUTING.md encourage you to submit PRs and issues affecting shared code upstream to help both projects.

## How ReCoco is Different from CocoIndex

1. **ReCoco fully exposes a Rust API.** You can use ReCoco to support your rust ETL projects directly. **Build on it.**

2. **Every target, source, and function is independently feature-gated. Use only what you want.**

We will regularly merge in upstream fixes and changes, particularly 

## ✨ Key Features

### Unique to ReCoco

- πŸ¦€ **Pure Rust**: No Python dependencies, interpreters, or build tools required
- 🀏 **Granular dependency selection**: Select and install only the features you need for your project
- πŸš€ **Additional optimizations**: We add additional compile-time optimizations and use some faster alternatives (i.e. `blake2` -> `blake3`) to make ReCoco as fast as possible
- πŸ“¦ **Workspace Structure**: Clean separation into `recoco`, `recoco-utils`, and `recoco-splitters` crates

### CocoIndex and ReCoco

- ⚑ **Incremental Processing**: Built on a dataflow engine that processes only changed data
- 🎯 **Modular Architecture**: Feature-gated sources, targets, and functions - use only what you need
- πŸ”Œ **Rich Connector Ecosystem**:
  - **Sources**: Local Files, PostgreSQL, S3, Azure Blob, Google Drive
  - **Targets**: PostgreSQL, Qdrant, Neo4j, KΓΉzu
  - **Functions**: Text splitting, LLM embedding and calling, Embedding generation (SentenceTransformers), JSON parsing, language detection
- πŸš€ **Async API**: Fully async/await compatible API based on Tokio
- πŸ”„ **Data Lineage Tracking**: Automatic tracking of data dependencies for smart incremental updates

## 🎯 Use Cases

ReCoco, like CocoIndex, enables scalable data pipelines with intelligent incremental processing for many use cases, including:

- **RAG (Retrieval-Augmented Generation) Pipelines**: Ingest documents, split them intelligently, generate embeddings, and store in vector databases
- **ETL Workflows**: Extract data from various sources, transform it, and load into databases or data warehouses
- **Document Processing**: Parse, chunk, and extract information from large document collections
- **Data Synchronization**: Keep data synchronized across multiple systems with automatic change detection
- **Custom Data Transformations**: Build domain-specific data processing pipelines with custom operations

## Installation

Add `recoco` to your `Cargo.toml`. Since ReCoco uses a modular feature system, you should enable only the features you need.

```toml
[dependencies]
recoco = { version = "0.1.0", default-features = false, features = ["source-local-file", "function-split"] }
```

### Available Features

#### πŸ“₯ Sources (Data Ingestion)

| Feature | Description | Dependencies |
|---------|-------------|--------------|
| `source-local-file` | Read files from local filesystem | βœ… Default - lightweight |
| `source-postgres` | Read from PostgreSQL (Change Data Capture) | πŸ“¦ PostgreSQL driver |
| `source-s3` | Read from Amazon S3 | πŸ“¦ AWS SDK |
| `source-azure` | Read from Azure Blob Storage | πŸ“¦ Azure SDK |
| `source-gdrive` | Read from Google Drive | πŸ“¦ Google APIs |

#### πŸ“€ Targets (Data Persistence)

| Feature | Description | Dependencies |
|---------|-------------|--------------|
| `target-postgres` | Write to PostgreSQL | πŸ“¦ PostgreSQL driver |
| `target-qdrant` | Write to Qdrant Vector DB | πŸ“¦ Qdrant client |
| `target-neo4j` | Write to Neo4j Graph DB | πŸ“¦ Neo4j driver |
| `target-kuzu` | Write to KΓΉzu embedded Graph DB | πŸ“¦ KΓΉzu bindings |

#### βš™οΈ Functions (Data Transformations)

| Feature | Description | Dependencies |
|---------|-------------|--------------|
| `function-split` | Text splitting utilities (recursive, semantic) | βœ… Lightweight |
| `function-embed` | Text embedding (OpenAI, Vertex AI) | πŸ“¦ LLM APIs |
| `function-extract-llm` | Information extraction via LLM | πŸ“¦ LLM APIs |
| `function-detect-lang` | Programming language detection | βœ… Lightweight |
| `function-json` | JSON/JSON5 parsing and manipulation | βœ… Lightweight |

#### πŸ“¦ Feature Bundles

| Feature | Description |
|---------|-------------|
| `all-sources` | Enable all source connectors |
| `all-targets` | Enable all target connectors |
| `all-functions` | Enable all transform functions |
| `full` | Enable everything (⚠️ heavy dependencies) |

## πŸš€ Quick Start

### Basic Text Processing

Here's a simple example that processes a string using a transient flow (in-memory, no persistence):

```rust
use recoco::prelude::*;
use recoco::builder::FlowBuilder;
use recoco::execution::evaluator::evaluate_transient_flow;
use serde_json::json;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // 1. Initialize library context (loads operation registry)
    recoco::lib_context::init_lib_context(None).await?;

    // 2. Create a flow builder
    let mut builder = FlowBuilder::new("hello_world").await?;

    // 3. Define input schema
    let input = builder.add_direct_input(
        "text".to_string(),
        schema::make_output_type(schema::BasicValueType::Str),
    )?;

    // 4. Add a text splitting transformation
    let output = builder.transform(
        "SplitBySeparators".to_string(),
        json!({ "separators_regex": [" "] }).as_object().unwrap().clone(),
        vec![(input, Some("text".to_string()))],
        None,
        "splitter".to_string(),
    ).await?;

    // 5. Set the output of the flow
    builder.set_direct_output(output)?;

    // 6. Build and execute the flow
    let flow = builder.build_transient_flow().await?;
    let result = evaluate_transient_flow(
        &flow.0,
        &vec![value::Value::Basic("Hello ReCoco".into())]
    ).await?;

    println!("Result: {:?}", result);
    Ok(())
}
```

### Custom Operations

You can define custom operations by implementing the `SimpleFunctionExecutor` and `SimpleFunctionFactoryBase` traits:

```rust
use recoco::ops::sdk::*;

struct UpperCaseExecutor;

#[async_trait::async_trait]
impl SimpleFunctionExecutor for UpperCaseExecutor {
    async fn execute(&self, inputs: Vec<Value>) -> Result<Vec<Value>, ExecutionError> {
        let input_str = inputs[0].as_basic_str()
            .ok_or_else(|| ExecutionError::InvalidInput("Expected string".into()))?;

        Ok(vec![Value::Basic(input_str.to_uppercase().into())])
    }
}

// Register your custom operation
// See examples/custom_op.rs for complete implementation
```

### Flow Construction Pattern

All flows follow this consistent pattern:

```rust
// 1. Initialize library context
recoco::lib_context::init_lib_context(None).await?;

// 2. Create builder
let mut builder = FlowBuilder::new("my_flow").await?;

// 3. Define inputs
let input = builder.add_direct_input(/*...*/)?;

// 4. Chain transformations
let step1 = builder.transform(/*...*/)?;
let step2 = builder.transform(/*...*/)?;

// 5. Set outputs
builder.set_direct_output(step2)?;

// 6. Build and execute
let flow = builder.build_transient_flow().await?;
let result = evaluate_transient_flow(&flow.0, &inputs).await?;
```

## Examples

Check out the `examples/` directory for more usage patterns:

- `transient.rs`: Basic Hello World
- `file_processing.rs`: Line-by-line file processing
- `custom_op.rs`: Defining and registering custom Rust operations
- `detect_lang.rs`: Using built-in functions

Run examples with the required features:
```bash
# Basic transient flow
cargo run -p recoco --example transient --features function-split

# File processing
cargo run -p recoco --example file_processing --features function-split

# Custom operations
cargo run -p recoco --example custom_op

# Language detection
cargo run -p recoco --example detect_lang --features function-detect-lang
```

## πŸ› οΈ Development

### Building

```bash
# Build with default features (source-local-file only)
cargo build

# Build with specific features
cargo build --features "function-split,source-postgres"

# Build with all features (includes all heavy dependencies)
cargo build --features full

# Build specific feature bundles
cargo build --features all-sources    # All source connectors
cargo build --features all-targets    # All target connectors
cargo build --features all-functions  # All transform functions
```

### Testing

```bash
# Run all tests with default features
cargo test

# Run tests with specific features
cargo test --features "function-split,source-postgres"

# Run tests with all features
cargo test --features full

# Run a specific test with output
cargo test test_name -- --nocapture
```

### Code Quality

```bash
# Check code formatting
cargo fmt --all -- --check

# Format code
cargo fmt

# Run clippy with all features
cargo clippy --all-features -- -D warnings

# Run clippy for specific workspace member
cargo clippy -p recoco --all-features
```

## πŸ—οΈ Architecture

### Core Dataflow Model

ReCoco implements an **incremental dataflow engine** where data flows through **Flows**:

```
Sources β†’ Transforms β†’ Targets
```

- **Sources**: Ingest data (files, database rows, cloud storage objects)
- **Transforms**: Process data (split, embed, map, filter, extract)
- **Targets**: Persist results (vector databases, graph databases, relational databases)

The engine tracks **data lineage** - when source data changes, only affected downstream computations are re-executed. This makes ReCoco highly efficient for processing large datasets that change incrementally.

### Two Flow Execution Modes

1. **Transient Flows** - In-memory execution without persistence
   - Use `FlowBuilder::build_transient_flow()`
   - Evaluate with `execution::evaluator::evaluate_transient_flow()`
   - No database tracking, ideal for testing and single-run operations
   - Fast and lightweight for one-off transformations

2. **Persisted Flows** - Tracked execution with incremental updates
   - Use `FlowBuilder::build_flow()` to create persisted flow spec
   - Requires database setup for state tracking
   - Enables incremental processing when data changes
   - Perfect for production ETL pipelines

### Module Organization

```
recoco/
β”œβ”€β”€ base/          - Core data types (schema, value, spec, json_schema)
β”œβ”€β”€ builder/       - Flow construction API (FlowBuilder, analysis, planning)
β”œβ”€β”€ execution/     - Runtime engine (evaluator, memoization, indexing, tracking)
β”œβ”€β”€ ops/           - Operation implementations
β”‚   β”œβ”€β”€ sources/   - Data ingestion (local-file, postgres, s3, azure, gdrive)
β”‚   β”œβ”€β”€ functions/ - Transforms (split, embed, json, detect-lang, extract-llm)
β”‚   β”œβ”€β”€ targets/   - Data persistence (postgres, qdrant, neo4j, kuzu)
β”‚   β”œβ”€β”€ interface.rs  - Trait definitions for all operation types
β”‚   β”œβ”€β”€ registry.rs   - Operation registration and lookup
β”‚   └── sdk.rs        - Public API for custom operations
β”œβ”€β”€ lib_context.rs - Global library initialization and context management
└── prelude.rs     - Common imports (use recoco::prelude::*)
```

## 🀝 Contributing

Contributions are welcome! Here's how to get started:

1. **Fork the repository** and clone your fork
2. **Create a feature branch**: `git checkout -b feature/my-new-feature`
3. **Make your changes** following our code style and conventions
4. **Run tests**: `cargo test --features full`
5. **Run formatting**: `cargo fmt --all`
6. **Run clippy**: `cargo clippy --all-features -- -D warnings`
7. **Commit your changes** using [Conventional Commits]https://www.conventionalcommits.org/:
   - `feat:` for new features
   - `fix:` for bug fixes
   - `docs:` for documentation
   - `chore:` for maintenance tasks
   - `refactor:` for code restructuring
8. **Push to your fork** and **submit a pull request**

Please see [CONTRIBUTING.md](CONTRIBUTING.md) for detailed guidelines.

### Development Tips

- **Feature Gates**: When adding new dependencies, make them optional with feature flags
- **Testing**: Include unit tests alongside implementation files
- **Documentation**: Add doc comments for public APIs
- **Examples**: For significant features, consider adding an example in `crates/recoco/examples/`

## πŸ”— Relationship to CocoIndex

ReCoco is a fork of [CocoIndex](https://github.com/cocoindex/cocoindex):

| Aspect | CocoIndex (Upstream) | ReCoco (Fork) |
|--------|---------------------|---------------|
| **Primary Language** | Python with Rust core | Pure Rust |
| **API Surface** | Python-only | Full Rust API |
| **Distribution** | Not on crates.io | Published to crates.io |
| **Dependencies** | All bundled together | Feature-gated and modular |
| **Target Audience** | Python developers | Rust developers |
| **License** | Apache-2.0 | Apache-2.0 |

We aim to maintain compatibility with CocoIndex's core dataflow engine to allow porting upstream improvements, while diverging significantly in the API surface and dependency management to better serve Rust users.

Code headers maintain dual copyright (CocoIndex upstream, Knitli Inc. for ReCoco modifications) under Apache-2.0.

## πŸ“„ License

[Apache License 2.0](LICENSE); see [NOTICE](NOTICE) for full license text.

This project is [REUSE 3.3 compliant](https://reuse.software/).

## πŸ™ Acknowledgments

ReCoco is built on the excellent foundation provided by [CocoIndex](https://github.com/cocoindex/cocoindex). We're grateful to the CocoIndex team for creating such a powerful and well-designed dataflow engine.

---

<div align="center">

**Built with πŸ¦€ by [Knitli Inc.](https://knit.li)**

[Documentation](https://docs.rs/recoco) β€’ [Crates.io](https://crates.io/crates/recoco) β€’ [GitHub](https://github.com/knitli/recoco) β€’ [Issues](https://github.com/knitli/recoco/issues)

</div>