use recoco::base::schema::{BasicValueType, EnrichedValueType, FieldSchema, ValueType};
use recoco::base::value::{BasicValue, FieldValues, KeyValue, Value};
use recoco::ops::factory_bases::TargetFactoryBase;
use recoco::ops::interface::{
ExportTargetDeleteEntry, ExportTargetMutationWithContext, ExportTargetUpsertEntry,
};
use thread_flow::targets::d1::{D1ExportContext, D1Spec, D1TargetFactory};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("🚀 Thread D1 Target Factory Test\n");
let d1_spec = D1Spec {
account_id: "test-account".to_string(),
database_id: "thread_test".to_string(),
api_token: "test-token".to_string(),
table_name: Some("code_symbols".to_string()),
};
println!("📋 Configuration:");
println!(" Database: {}", d1_spec.database_id);
println!(" Table: {}\n", d1_spec.table_name.as_ref().unwrap());
let factory = D1TargetFactory;
println!("✅ Target factory: {}", factory.name());
let key_fields_schema = vec![FieldSchema::new(
"content_hash",
EnrichedValueType {
typ: ValueType::Basic(BasicValueType::Str),
nullable: false,
attrs: Default::default(),
},
)];
let value_fields_schema = vec![
FieldSchema::new(
"file_path",
EnrichedValueType {
typ: ValueType::Basic(BasicValueType::Str),
nullable: false,
attrs: Default::default(),
},
),
FieldSchema::new(
"symbol_name",
EnrichedValueType {
typ: ValueType::Basic(BasicValueType::Str),
nullable: false,
attrs: Default::default(),
},
),
FieldSchema::new(
"symbol_type",
EnrichedValueType {
typ: ValueType::Basic(BasicValueType::Str),
nullable: false,
attrs: Default::default(),
},
),
FieldSchema::new(
"start_line",
EnrichedValueType {
typ: ValueType::Basic(BasicValueType::Int64),
nullable: false,
attrs: Default::default(),
},
),
FieldSchema::new(
"end_line",
EnrichedValueType {
typ: ValueType::Basic(BasicValueType::Int64),
nullable: false,
attrs: Default::default(),
},
),
FieldSchema::new(
"source_code",
EnrichedValueType {
typ: ValueType::Basic(BasicValueType::Str),
nullable: false,
attrs: Default::default(),
},
),
FieldSchema::new(
"language",
EnrichedValueType {
typ: ValueType::Basic(BasicValueType::Str),
nullable: false,
attrs: Default::default(),
},
),
];
let metrics = thread_flow::monitoring::performance::PerformanceMetrics::new();
let export_context = D1ExportContext::new_with_default_client(
d1_spec.database_id.clone(),
d1_spec.table_name.clone().unwrap(),
d1_spec.account_id.clone(),
d1_spec.api_token.clone(),
key_fields_schema,
value_fields_schema,
metrics,
)
.expect("Failed to create D1 export context");
println!("🔧 Export context created");
println!(
" Key fields: {:?}",
export_context
.key_fields_schema
.iter()
.map(|f| &f.name)
.collect::<Vec<_>>()
);
println!(
" Value fields: {:?}\n",
export_context
.value_fields_schema
.iter()
.map(|f| &f.name)
.collect::<Vec<_>>()
);
let sample_entries = vec![
create_symbol_entry(
"abc123",
"src/main.rs",
"main",
"function",
1,
10,
"fn main() { ... }",
"rust",
),
create_symbol_entry(
"def456",
"src/lib.rs",
"Calculator",
"struct",
15,
50,
"pub struct Calculator { ... }",
"rust",
),
create_symbol_entry(
"ghi789",
"src/utils.ts",
"capitalize",
"function",
5,
8,
"export function capitalize(str: string) { ... }",
"typescript",
),
];
println!("📊 Sample Data:");
for (i, entry) in sample_entries.iter().enumerate() {
println!(" {}. {:?}", i + 1, get_symbol_name(&entry.value));
}
println!();
println!("🔄 Testing UPSERT operation...");
let first_key = sample_entries[0].key.clone();
let mutation = recoco::ops::interface::ExportTargetMutation {
upserts: sample_entries,
deletes: vec![],
};
let _mutation_with_context = ExportTargetMutationWithContext {
mutation,
export_context: &export_context,
};
println!(" ⚠️ Skipping actual HTTP call (test credentials)");
println!(" In production, this would:");
println!(" 1. Convert ReCoco values to JSON");
println!(" 2. Build UPSERT SQL statements");
println!(" 3. Execute batch via D1 HTTP API");
println!(" 4. Handle response and errors\n");
println!("🗑️ Testing DELETE operation...");
let delete_entries = vec![ExportTargetDeleteEntry {
key: first_key,
additional_key: serde_json::Value::Null,
}];
let delete_mutation = recoco::ops::interface::ExportTargetMutation {
upserts: vec![],
deletes: delete_entries,
};
let _delete_mutation_with_context = ExportTargetMutationWithContext {
mutation: delete_mutation,
export_context: &export_context,
};
println!(" ⚠️ Skipping actual HTTP call (test credentials)");
println!(" In production, this would:");
println!(" 1. Extract key from KeyValue");
println!(" 2. Build DELETE SQL statement");
println!(" 3. Execute via D1 HTTP API\n");
println!("📝 Example SQL that would be generated:\n");
println!(" UPSERT:");
println!(
" INSERT INTO code_symbols (content_hash, file_path, symbol_name, symbol_type, start_line, end_line, source_code, language)"
);
println!(" VALUES (?, ?, ?, ?, ?, ?, ?, ?)");
println!(" ON CONFLICT DO UPDATE SET");
println!(" file_path = excluded.file_path,");
println!(" symbol_name = excluded.symbol_name,");
println!(" symbol_type = excluded.symbol_type,");
println!(" start_line = excluded.start_line,");
println!(" end_line = excluded.end_line,");
println!(" source_code = excluded.source_code,");
println!(" language = excluded.language;\n");
println!(" DELETE:");
println!(" DELETE FROM code_symbols WHERE content_hash = ?;\n");
println!("✅ D1 Target Factory Test Complete!\n");
println!("💡 Next Steps:");
println!(" 1. Set up local D1: wrangler d1 execute thread_test --local --file=schema.sql");
println!(" 2. Update credentials to use real Cloudflare account");
println!(" 3. Integrate into ThreadFlowBuilder for full pipeline");
println!(" 4. Test with real D1 database (local or production)");
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn create_symbol_entry(
hash: &str,
file_path: &str,
symbol_name: &str,
symbol_type: &str,
start_line: i64,
end_line: i64,
source_code: &str,
language: &str,
) -> ExportTargetUpsertEntry {
use recoco::base::value::KeyPart;
let key = KeyValue(Box::new([KeyPart::Str(hash.into())]));
let value = FieldValues {
fields: vec![
Value::Basic(BasicValue::Str(file_path.into())),
Value::Basic(BasicValue::Str(symbol_name.into())),
Value::Basic(BasicValue::Str(symbol_type.into())),
Value::Basic(BasicValue::Int64(start_line)),
Value::Basic(BasicValue::Int64(end_line)),
Value::Basic(BasicValue::Str(source_code.into())),
Value::Basic(BasicValue::Str(language.into())),
],
};
ExportTargetUpsertEntry {
key,
additional_key: serde_json::Value::Null,
value,
}
}
fn get_symbol_name(fields: &FieldValues) -> String {
if let Some(Value::Basic(BasicValue::Str(s))) = fields.fields.get(1) {
s.to_string()
} else {
"unknown".to_string()
}
}