thread-flow 0.1.0

Thread dataflow integration for data processing pipelines, using CocoIndex.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
<!--
SPDX-FileCopyrightText: 2026 Knitli Inc.

SPDX-License-Identifier: MIT OR Apache-2.0
-->

# ReCoco Content Hashing Integration

**Analysis Date**: January 27, 2026
**Finding**: ReCoco already implements blake3-based content hashing for deduplication

---

## Executive Summary

ReCoco has a comprehensive content-addressed caching system using blake3 hashing. We can leverage this existing infrastructure instead of implementing our own content hashing for D1 deduplication.

**Key Insight**: ReCoco's `Fingerprint` type (16-byte blake3 hash) can be used directly as D1 primary keys via `KeyPart::Bytes`.

---

## ReCoco's Fingerprinting System

### Core Components

#### 1. Fingerprint Type
**Location**: `/home/knitli/recoco/crates/recoco-utils/src/fingerprint.rs`

```rust
#[derive(Clone, Copy, PartialEq, Eq)]
pub struct Fingerprint(pub [u8; 16]);

impl Fingerprint {
    pub fn to_base64(self) -> String { /* ... */ }
    pub fn from_base64(s: &str) -> Result<Self> { /* ... */ }
    pub fn as_slice(&self) -> &[u8] { /* ... */ }
}
```

**Features**:
- 16-byte blake3 hash (128 bits)
- Base64 serialization for JSON/strings
- Implements Hash, Eq, Ord for use as HashMap/BTreeMap keys
- Serde support for serialization

#### 2. Fingerprinter Builder
**Location**: Same file

```rust
#[derive(Clone, Default)]
pub struct Fingerprinter {
    hasher: blake3::Hasher,
}

impl Fingerprinter {
    pub fn into_fingerprint(self) -> Fingerprint { /* ... */ }

    pub fn with<S: Serialize + ?Sized>(
        self,
        value: &S,
    ) -> Result<Self, FingerprinterError> { /* ... */ }

    pub fn write<S: Serialize + ?Sized>(
        &mut self,
        value: &S,
    ) -> Result<(), FingerprinterError> { /* ... */ }
}
```

**Features**:
- Implements `serde::Serializer` - can hash any Serialize type
- Type-aware hashing (includes type tags: "s" for str, "i8" for int64, etc.)
- Deterministic across runs
- Handles complex nested structures (structs, enums, maps, sequences)

#### 3. Memoization System
**Location**: `/home/knitli/recoco/crates/recoco-core/src/execution/memoization.rs`

```rust
pub struct StoredMemoizationInfo {
    pub cache: HashMap<Fingerprint, StoredCacheEntry>,
    pub uuids: HashMap<Fingerprint, Vec<uuid::Uuid>>,
    pub content_hash: Option<String>, // DEPRECATED
}

pub struct EvaluationMemory {
    cache: Option<Mutex<HashMap<Fingerprint, CacheEntry>>>,
    uuids: Mutex<HashMap<Fingerprint, UuidEntry>>,
    // ...
}
```

**Features**:
- Uses `Fingerprint` as cache keys
- Stores computation results keyed by input fingerprint
- Enables content-addressed deduplication
- Note: has deprecated `content_hash` field → suggests moving to `Fingerprint`

---

## Integration with D1

### Current D1 KeyValue System

D1 target uses `KeyValue` for primary keys:

```rust
pub enum KeyPart {
    Bytes(Bytes),        // ← Can hold Fingerprint!
    Str(Arc<str>),
    Bool(bool),
    Int64(i64),
    Range(RangeValue),
    Uuid(uuid::Uuid),
    Date(chrono::NaiveDate),
    Struct(Vec<KeyPart>),
}

pub struct KeyValue(pub Box<[KeyPart]>);
```

### Proposed Integration

**Option 1: Use Fingerprint directly as primary key**

```rust
// In ThreadFlowBuilder or source operator:
use recoco_utils::fingerprint::{Fingerprint, Fingerprinter};

// Compute fingerprint of file content
let mut fp = Fingerprinter::default();
fp.write(&file_content)?;
let fingerprint = fp.into_fingerprint();

// Use as D1 primary key
let key = KeyValue(Box::new([
    KeyPart::Bytes(Bytes::from(fingerprint.as_slice().to_vec()))
]));
```

**Option 2: Expose fingerprint as a field**

```rust
// Add fingerprint to schema
FieldSchema::new(
    "content_hash",
    EnrichedValueType {
        typ: ValueType::Basic(BasicValueType::Bytes),
        nullable: false,
        attrs: Default::default(),
    },
)

// Include in field values
FieldValues {
    fields: vec![
        Value::Basic(BasicValue::Bytes(
            Bytes::from(fingerprint.as_slice().to_vec())
        )),
        // ... other fields
    ],
}
```

---

## Benefits of Using ReCoco Fingerprints

### 1. **Consistency**
- Same hashing algorithm across entire ReCoco pipeline
- Deterministic hashing ensures reproducibility
- Type-aware hashing prevents collisions

### 2. **Performance**
- blake3 is extremely fast (multi-threaded, SIMD optimized)
- 16-byte fingerprints are compact (vs 32-byte SHA256 or 64-byte SHA512)
- Already integrated into ReCoco's execution engine

### 3. **Deduplication**
- Automatic deduplication at ReCoco level
- Cache hits for identical content
- Incremental updates only for changed content

### 4. **Integration**
- No additional dependencies (blake3 already in ReCoco)
- Works seamlessly with memoization system
- Compatible with D1 primary keys via `KeyPart::Bytes`

---

## Implementation Plan

### Phase 1: Expose Fingerprints in Thread Operators

**Modify `thread_parse` operator** to include content fingerprint:

```rust
// In thread-flow/src/functions/parse.rs

use recoco_utils::fingerprint::{Fingerprint, Fingerprinter};

pub struct ParsedDocument {
    pub symbols: LTable,
    pub imports: LTable,
    pub calls: LTable,
    pub content_fingerprint: Fingerprint,  // NEW
}

impl ThreadParseFactory {
    async fn execute(&self, inputs: &Inputs) -> Result<ParsedDocument> {
        let content = &inputs.content;

        // Compute content fingerprint
        let mut fp = Fingerprinter::default();
        fp.write(content)?;
        let content_fingerprint = fp.into_fingerprint();

        // Parse content
        let parsed = parse_source_code(content, &inputs.language)?;

        Ok(ParsedDocument {
            symbols: extract_symbols(&parsed),
            imports: extract_imports(&parsed),
            calls: extract_calls(&parsed),
            content_fingerprint,
        })
    }
}
```

### Phase 2: Update D1 Target to Use Fingerprints

**Modify D1 schema** to use fingerprint as primary key:

```sql
CREATE TABLE code_symbols (
    -- Use fingerprint as primary key
    content_hash BLOB PRIMARY KEY,  -- 16 bytes from Fingerprint

    file_path TEXT NOT NULL,
    symbol_name TEXT NOT NULL,
    symbol_type TEXT NOT NULL,
    line_start INTEGER,
    line_end INTEGER,
    source_code TEXT,
    language TEXT,
    indexed_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

-- Index for file queries
CREATE INDEX idx_symbols_file ON code_symbols(file_path);
CREATE INDEX idx_symbols_name ON code_symbols(symbol_name);
```

**Update D1TargetFactory** to extract fingerprint:

```rust
impl D1TargetExecutor {
    async fn apply_mutation(&self, upserts: Vec<...>) -> Result<()> {
        for upsert in upserts {
            // Extract fingerprint from key
            let fingerprint_bytes = match &upsert.key.0[0] {
                KeyPart::Bytes(b) => b.clone(),
                _ => return Err("Expected Bytes for fingerprint key"),
            };

            // Convert to base64 for D1 storage
            let content_hash = BASE64_STANDARD.encode(&fingerprint_bytes);

            // Build UPSERT
            let sql = format!(
                "INSERT INTO code_symbols (content_hash, ...)
                 VALUES (?, ...)
                 ON CONFLICT (content_hash) DO UPDATE SET ..."
            );

            self.execute_d1(&sql, params).await?;
        }
        Ok(())
    }
}
```

### Phase 3: Enable Incremental Updates

**Add content-hash check** before re-analysis:

```rust
// In ThreadFlowBuilder or Worker handler

async fn should_analyze(
    file_path: &str,
    content: &str,
    d1: &D1Client,
) -> Result<bool> {
    // Compute current fingerprint
    let mut fp = Fingerprinter::default();
    fp.write(content)?;
    let current_fp = fp.into_fingerprint();

    // Query D1 for existing fingerprint
    let existing_fp = d1.query_fingerprint(file_path).await?;

    // Only re-analyze if changed
    Ok(existing_fp != Some(current_fp))
}
```

---

## Performance Characteristics

### blake3 Hashing Speed
- **Throughput**: ~10 GB/s on modern CPUs
- **Latency**: <1μs for typical code files (<100 KB)
- **Comparison**: 10-100x faster than SHA256/SHA512

### Fingerprint Size
- **Storage**: 16 bytes per fingerprint
- **Base64**: 24 characters when serialized
- **Collision Risk**: 2^128 space (negligible for code files)

### Cache Hit Rates
With content-addressed caching:
- **Unchanged files**: 100% cache hit (no re-analysis)
- **Incremental updates**: Only changed files re-analyzed
- **Expected speedup**: 50-100x on repeated analysis

---

## Comparison: Custom Hash vs ReCoco Fingerprint

| Aspect | Custom Hash (md5/sha256) | ReCoco Fingerprint (blake3) |
|--------|-------------------------|----------------------------|
| **Performance** | Slower (SHA256: ~500 MB/s) | Faster (blake3: ~10 GB/s) |
| **Size** | 32 bytes (SHA256) | 16 bytes (compact) |
| **Integration** | New dependency | Already in ReCoco |
| **Consistency** | Independent system | Matches ReCoco memoization |
| **Type Safety** | String/bytes only | Serde-aware (all types) |
| **Deduplication** | Manual | Automatic via memoization |

**Recommendation**: Use ReCoco's Fingerprint system exclusively.

---

## Migration Path

### Existing D1 Schemas

For D1 schemas already using `content_hash TEXT`:

**Option A: Keep as base64 string**
```rust
let fingerprint_str = fingerprint.to_base64();  // 24-char base64 string
```

**Option B: Migrate to BLOB**
```sql
-- Migration script
ALTER TABLE code_symbols ADD COLUMN content_fp BLOB;
UPDATE code_symbols SET content_fp = base64_decode(content_hash);
ALTER TABLE code_symbols DROP COLUMN content_hash;
ALTER TABLE code_symbols RENAME COLUMN content_fp TO content_hash;
```

**Recommendation**: Use base64 strings for now (easier debugging, human-readable).

---

## Next Steps

### Immediate
1. ✅ Analyze ReCoco fingerprinting system (this document)
2. ⏳ Update `thread_parse` to expose `content_fingerprint`
3. ⏳ Modify D1 target to use fingerprints as primary keys
4. ⏳ Add incremental update logic with fingerprint comparison

### Short-Term
5. ⏳ Test content-hash deduplication locally
6. ⏳ Benchmark cache hit rates
7. ⏳ Document fingerprint usage in ThreadFlowBuilder

### Long-Term
8. ⏳ Integrate with ReCoco memoization for cross-session caching
9. ⏳ Add fingerprint-based query APIs
10. ⏳ Optimize for large-scale incremental updates

---

## Example: Complete Flow

```rust
// 1. User provides source code
let code = r#"
    fn main() {
        println!("Hello, world!");
    }
"#;

// 2. Compute fingerprint (ReCoco)
let mut fp = Fingerprinter::default();
fp.write(code)?;
let fingerprint = fp.into_fingerprint();
// fingerprint.to_base64() => "xK8H3vQm9..."

// 3. Check if already analyzed (D1)
let needs_analysis = !d1.has_fingerprint(&fingerprint).await?;

if needs_analysis {
    // 4. Parse and analyze (thread-ast-engine)
    let parsed = thread_parse(code, "rust")?;

    // 5. Build upsert with fingerprint key
    let upsert = ExportTargetUpsertEntry {
        key: KeyValue(Box::new([
            KeyPart::Bytes(Bytes::from(fingerprint.as_slice()))
        ])),
        value: FieldValues {
            fields: vec![
                Value::Basic(BasicValue::Str("src/main.rs".into())),
                Value::Basic(BasicValue::Str("main".into())),
                // ... other fields
            ],
        },
        additional_key: serde_json::Value::Null,
    };

    // 6. UPSERT to D1 (deduplication automatic via primary key)
    d1.apply_mutation(vec![upsert], vec![]).await?;
}

// 7. Result: 50x+ speedup on repeated analysis!
```

---

## Conclusion

ReCoco's existing blake3-based fingerprinting system provides:
- **Better performance** than custom hashing
-**Seamless integration** with ReCoco memoization
-**Type-safe content hashing** via Serde
-**Compact 16-byte fingerprints**
-**Automatic deduplication**

**Recommendation**: Use ReCoco's `Fingerprint` type exclusively for all content-addressed caching in D1 and edge deployment.

No need to implement custom hashing - leverage what's already there! 🎯