1mod tests;
2pub mod timon_engine;
3
4#[cfg(target_os = "android")]
6pub mod android {
7 use crate::timon_engine::{
8 cloud_fetch_parquet, cloud_fetch_parquet_batch, cloud_sink_parquet, cloud_sync_parquet, create_database, create_table, delete_database,
9 delete_table, init_bucket, init_timon, insert, list_databases, list_tables, query,
10 };
11 use jni::objects::{JClass, JObject, JString, JValue};
12 use jni::sys::{jint, jstring};
13 use jni::JNIEnv;
14 use jni::NativeMethod;
15 use std::collections::HashMap;
16 use std::ffi::c_void;
17 use std::sync::LazyLock;
18 use tokio::runtime::Runtime;
19
20 static RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
22 Runtime::new().unwrap_or_else(|e| {
23 eprintln!("CRITICAL: Failed to create tokio runtime for JNI interface: {:?}", e);
24 std::process::abort(); })
26 });
27
28 fn jstring_to_rust_string(env: &mut JNIEnv, j_string: &JString) -> Result<String, String> {
30 env
31 .get_string(j_string)
32 .map(|jstr| jstr.into())
33 .map_err(|e| format!("Failed to convert Java String to Rust String: {:?}", e))
34 }
35
36 fn rust_string_to_jstring(env: &mut JNIEnv, rust_string: &str) -> Result<jstring, String> {
38 env
39 .new_string(rust_string)
40 .map(|jstr| jstr.into_raw())
41 .map_err(|e| format!("Failed to create Java String from Rust String: {:?}", e))
42 }
43
44 fn return_jstring_or_fallback(env: &mut JNIEnv, msg: &str) -> jstring {
47 if let Ok(j) = rust_string_to_jstring(env, msg) {
48 return j;
49 }
50 eprintln!("JNI: failed to create string for response, using fallback");
51 const FALLBACK: &str = r#"{"error":"JNI string conversion failed"}"#;
52 rust_string_to_jstring(env, FALLBACK).unwrap_or(std::ptr::null_mut())
53 }
54
55 #[no_mangle]
57 pub unsafe extern "C" fn nativeInitTimon(
58 mut env: JNIEnv,
59 _class: JClass,
60 storage_path: JString,
61 bucket_interval: jint,
62 username: JString,
63 ) -> jstring {
64 let rust_storage_path = match jstring_to_rust_string(&mut env, &storage_path) {
66 Ok(s) => s,
67 Err(e) => {
68 eprintln!("Error converting storage_path: {}", e);
69 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
70 }
71 };
72 let rust_bucket_interval: u32 = bucket_interval as u32;
74 let rust_username = match jstring_to_rust_string(&mut env, &username) {
75 Ok(s) => s,
76 Err(e) => {
77 eprintln!("Error converting username: {}", e);
78 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
79 }
80 };
81
82 match init_timon(&rust_storage_path, rust_bucket_interval, &rust_username) {
83 Ok(result) => {
84 let json_string = result.to_string();
85 return_jstring_or_fallback(&mut env, &json_string)
86 }
87 Err(err) => {
88 let err_message = format!("Failed to initialize Timon: {:?}", err);
89 return_jstring_or_fallback(&mut env, &err_message)
90 }
91 }
92 }
93
94 #[no_mangle]
95 pub unsafe extern "C" fn nativeCreateDatabase(mut env: JNIEnv, _class: JClass, db_name: JString) -> jstring {
96 let rust_db_name = match jstring_to_rust_string(&mut env, &db_name) {
97 Ok(s) => s,
98 Err(e) => {
99 eprintln!("Error converting db_name: {}", e);
100 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
101 }
102 };
103
104 match create_database(&rust_db_name) {
105 Ok(result) => {
106 let json_string = result.to_string();
107 return_jstring_or_fallback(&mut env, &json_string)
108 }
109 Err(err) => {
110 let err_message = format!("Failed to create database: {:?}", err);
111 return_jstring_or_fallback(&mut env, &err_message)
112 }
113 }
114 }
115
116 #[no_mangle]
117 pub unsafe extern "C" fn nativeCreateTable(mut env: JNIEnv, _class: JClass, db_name: JString, table_name: JString, schema: JString) -> jstring {
118 let rust_db_name = match jstring_to_rust_string(&mut env, &db_name) {
119 Ok(s) => s,
120 Err(e) => {
121 eprintln!("Error converting db_name: {}", e);
122 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
123 }
124 };
125 let rust_table_name = match jstring_to_rust_string(&mut env, &table_name) {
126 Ok(s) => s,
127 Err(e) => {
128 eprintln!("Error converting table_name: {}", e);
129 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
130 }
131 };
132 let rust_schema = match jstring_to_rust_string(&mut env, &schema) {
133 Ok(s) => s,
134 Err(e) => {
135 eprintln!("Error converting schema: {}", e);
136 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
137 }
138 };
139
140 match create_table(&rust_db_name, &rust_table_name, &rust_schema) {
141 Ok(result) => {
142 let json_string = result.to_string();
143 return_jstring_or_fallback(&mut env, &json_string)
144 }
145 Err(err) => {
146 let err_message = format!("Failed to create table: {:?}", err);
147 return_jstring_or_fallback(&mut env, &err_message)
148 }
149 }
150 }
151
152 #[no_mangle]
153 pub unsafe extern "C" fn nativeListDatabases(mut env: JNIEnv, _class: JClass) -> jstring {
154 match list_databases() {
155 Ok(result) => {
156 let json_string = result.to_string();
157 return_jstring_or_fallback(&mut env, &json_string)
158 }
159 Err(err) => {
160 let err_message = format!("Failed to list databases: {:?}", err);
161 return_jstring_or_fallback(&mut env, &err_message)
162 }
163 }
164 }
165
166 #[no_mangle]
167 pub unsafe extern "C" fn nativeListTables(mut env: JNIEnv, _class: JClass, db_name: JString) -> jstring {
168 let rust_db_name = match jstring_to_rust_string(&mut env, &db_name) {
169 Ok(s) => s,
170 Err(e) => {
171 eprintln!("Error converting db_name: {}", e);
172 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
173 }
174 };
175
176 match list_tables(&rust_db_name) {
177 Ok(result) => {
178 let json_string = result.to_string();
179 return_jstring_or_fallback(&mut env, &json_string)
180 }
181 Err(err) => {
182 let err_message = format!("Failed to list tables: {:?}", err);
183 return_jstring_or_fallback(&mut env, &err_message)
184 }
185 }
186 }
187
188 #[no_mangle]
189 pub unsafe extern "C" fn nativeDeleteDatabase(mut env: JNIEnv, _class: JClass, db_name: JString) -> jstring {
190 let rust_db_name = match jstring_to_rust_string(&mut env, &db_name) {
191 Ok(s) => s,
192 Err(e) => {
193 eprintln!("Error converting db_name: {}", e);
194 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
195 }
196 };
197
198 match delete_database(&rust_db_name) {
199 Ok(result) => {
200 let json_string = result.to_string();
201 return_jstring_or_fallback(&mut env, &json_string)
202 }
203 Err(err) => {
204 let err_message = format!("Failed to delete database: {:?}", err);
205 return_jstring_or_fallback(&mut env, &err_message)
206 }
207 }
208 }
209
210 #[no_mangle]
211 pub unsafe extern "C" fn nativeDeleteTable(mut env: JNIEnv, _class: JClass, db_name: JString, table_name: JString) -> jstring {
212 let rust_db_name = match jstring_to_rust_string(&mut env, &db_name) {
213 Ok(s) => s,
214 Err(e) => {
215 eprintln!("Error converting db_name: {}", e);
216 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
217 }
218 };
219 let rust_table_name = match jstring_to_rust_string(&mut env, &table_name) {
220 Ok(s) => s,
221 Err(e) => {
222 eprintln!("Error converting table_name: {}", e);
223 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
224 }
225 };
226
227 match delete_table(&rust_db_name, &rust_table_name) {
228 Ok(result) => {
229 let json_string = result.to_string();
230 return_jstring_or_fallback(&mut env, &json_string)
231 }
232 Err(err) => {
233 let err_message = format!("Failed to delete table: {:?}", err);
234 return_jstring_or_fallback(&mut env, &err_message)
235 }
236 }
237 }
238
239 #[no_mangle]
240 pub unsafe extern "C" fn nativeInsert(mut env: JNIEnv, _class: JClass, db_name: JString, table_name: JString, json_data: JString) -> jstring {
241 let rust_db_name = match jstring_to_rust_string(&mut env, &db_name) {
242 Ok(s) => s,
243 Err(e) => {
244 eprintln!("Error converting db_name: {}", e);
245 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
246 }
247 };
248 let rust_table_name = match jstring_to_rust_string(&mut env, &table_name) {
249 Ok(s) => s,
250 Err(e) => {
251 eprintln!("Error converting table_name: {}", e);
252 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
253 }
254 };
255 let rust_json_data = match jstring_to_rust_string(&mut env, &json_data) {
256 Ok(s) => s,
257 Err(e) => {
258 eprintln!("Error converting json_data: {}", e);
259 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
260 }
261 };
262
263 match insert(&rust_db_name, &rust_table_name, &rust_json_data) {
264 Ok(result) => {
265 let json_string = result.to_string();
266 return_jstring_or_fallback(&mut env, &json_string)
267 }
268 Err(e) => {
269 let error_message = format!("Error writing JSON data to Parquet file: {:?}", e);
270 return_jstring_or_fallback(&mut env, &error_message)
271 }
272 }
273 }
274
275 fn get_date_range_value(env: &mut JNIEnv, date_range: &JObject, key: &str) -> Result<String, String> {
276 let j_key: JString = env
278 .new_string(key)
279 .map_err(|e| format!("Failed to create key string '{}': {:?}", key, e))?;
280
281 let j_key_obj: JObject = j_key.into();
283
284 let method_name = "get";
286 let method_sig = "(Ljava/lang/Object;)Ljava/lang/Object;";
287
288 let j_value = env
290 .call_method(
291 date_range, method_name,
293 method_sig,
294 &[JValue::from(&j_key_obj)], )
296 .map_err(|e| format!("Failed to call get method on date_range map: {:?}", e))?
297 .l() .map_err(|e| format!("Invalid value returned from get method for key '{}': {:?}", key, e))?;
299
300 let rust_value: String = env
302 .get_string(&JString::from(j_value))
303 .map_err(|e| format!("Failed to convert Java String to Rust String for key '{}': {:?}", key, e))?
304 .into();
305
306 Ok(rust_value)
307 }
308
309 #[no_mangle]
310 pub unsafe extern "C" fn nativeQuery(
311 mut env: JNIEnv,
312 _class: JClass,
313 db_name: JString,
314 sql_query: JString,
315 username: JString,
316 limit_partitions: jint,
317 ) -> jstring {
318 let rust_db_name = match jstring_to_rust_string(&mut env, &db_name) {
320 Ok(s) => s,
321 Err(e) => {
322 eprintln!("Error converting db_name: {}", e);
323 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
324 }
325 };
326 let rust_sql_query = match jstring_to_rust_string(&mut env, &sql_query) {
327 Ok(s) => s,
328 Err(e) => {
329 eprintln!("Error converting sql_query: {}", e);
330 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
331 }
332 };
333 let rust_username: Option<String> = if username.is_null() {
334 None
335 } else {
336 match jstring_to_rust_string(&mut env, &username) {
337 Ok(s) => Some(s),
338 Err(e) => {
339 eprintln!("Error converting username: {}", e);
340 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
341 }
342 }
343 };
344
345 let rust_limit_partitions = if limit_partitions > 0 { Some(limit_partitions as usize) } else { None };
347
348 match RUNTIME.block_on(query(&rust_db_name, &rust_sql_query, rust_username.as_deref(), rust_limit_partitions)) {
350 Ok(result) => {
351 let json_string = result.to_string();
352 return_jstring_or_fallback(&mut env, &json_string)
353 }
354 Err(e) => {
355 let error_message = format!("Error querying Parquet files: {:?}", e);
356 return_jstring_or_fallback(&mut env, &error_message)
357 }
358 }
359 }
360
361 #[no_mangle]
363 pub unsafe extern "C" fn nativeInitBucket(
364 mut env: JNIEnv,
365 _class: JClass,
366 bucket_endpoint: JString,
367 bucket_name: JString,
368 access_key_id: JString,
369 secret_access_key: JString,
370 bucket_region: JString,
371 ) -> jstring {
372 use zeroize::Zeroize;
373
374 let rust_bucket_endpoint = match jstring_to_rust_string(&mut env, &bucket_endpoint) {
375 Ok(s) => s,
376 Err(e) => {
377 eprintln!("Error converting bucket_endpoint: {}", e);
378 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
379 }
380 };
381 let rust_bucket_name = match jstring_to_rust_string(&mut env, &bucket_name) {
382 Ok(s) => s,
383 Err(e) => {
384 eprintln!("Error converting bucket_name: {}", e);
385 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
386 }
387 };
388
389 let mut rust_access_key_id = match jstring_to_rust_string(&mut env, &access_key_id) {
391 Ok(s) => s,
392 Err(e) => {
393 eprintln!("Error converting access_key_id: {}", e);
394 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
395 }
396 };
397 let mut rust_secret_access_key = match jstring_to_rust_string(&mut env, &secret_access_key) {
398 Ok(s) => s,
399 Err(e) => {
400 eprintln!("Error converting secret_access_key: {}", e);
401 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
402 }
403 };
404 let rust_bucket_region = match jstring_to_rust_string(&mut env, &bucket_region) {
405 Ok(s) => s,
406 Err(e) => {
407 eprintln!("Error converting bucket_region: {}", e);
408 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
409 }
410 };
411
412 let result = init_bucket(
414 &rust_bucket_endpoint,
415 &rust_bucket_name,
416 &rust_access_key_id,
417 &rust_secret_access_key,
418 &rust_bucket_region,
419 );
420
421 rust_access_key_id.zeroize();
423 rust_secret_access_key.zeroize();
424
425 match result {
426 Ok(result_value) => {
427 let json_string = result_value.to_string();
428 return_jstring_or_fallback(&mut env, &json_string)
429 }
430 Err(err) => {
431 let err_message = format!("Failed to initialize S3 bucket: {:?}", err);
432 return_jstring_or_fallback(&mut env, &err_message)
433 }
434 }
435 }
436
437 #[no_mangle]
438 pub unsafe extern "C" fn nativeCloudSyncParquet(
439 mut env: JNIEnv,
440 _class: JClass,
441 db_name: JString,
442 table_name: JString,
443 date_range: JObject,
444 username: JString,
445 ) -> jstring {
446 let rust_db_name = match jstring_to_rust_string(&mut env, &db_name) {
447 Ok(s) => s,
448 Err(e) => {
449 eprintln!("Error converting db_name: {}", e);
450 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
451 }
452 };
453 let rust_table_name = match jstring_to_rust_string(&mut env, &table_name) {
454 Ok(s) => s,
455 Err(e) => {
456 eprintln!("Error converting table_name: {}", e);
457 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
458 }
459 };
460
461 let mut rust_date_range: HashMap<&str, &str> = HashMap::new();
462 let rust_start = match get_date_range_value(&mut env, &date_range, "start") {
463 Ok(s) => s,
464 Err(e) => {
465 eprintln!("Error getting start date: {}", e);
466 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
467 }
468 };
469 let rust_end = match get_date_range_value(&mut env, &date_range, "end") {
470 Ok(s) => s,
471 Err(e) => {
472 eprintln!("Error getting end date: {}", e);
473 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
474 }
475 };
476 rust_date_range.insert("start_date", &rust_start);
477 rust_date_range.insert("end_date", &rust_end);
478
479 let rust_username: Option<String> = if username.is_null() {
480 None
481 } else {
482 match jstring_to_rust_string(&mut env, &username) {
483 Ok(s) => Some(s),
484 Err(e) => {
485 eprintln!("Error converting username: {}", e);
486 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
487 }
488 }
489 };
490
491 match RUNTIME.block_on(cloud_sync_parquet(
492 &rust_db_name,
493 &rust_table_name,
494 rust_date_range,
495 rust_username.as_deref(),
496 )) {
497 Ok(result) => {
498 let json_string = result.to_string();
499 return_jstring_or_fallback(&mut env, &json_string)
500 }
501 Err(err) => {
502 let err_message = format!("Failed fetch s3 parquet files: {:?}", err);
503 return_jstring_or_fallback(&mut env, &err_message)
504 }
505 }
506 }
507
508 #[no_mangle]
509 pub unsafe extern "C" fn nativeCloudSinkParquet(mut env: JNIEnv, _class: JClass, db_name: JString, table_name: JString) -> jstring {
510 let rust_db_name = match jstring_to_rust_string(&mut env, &db_name) {
511 Ok(s) => s,
512 Err(e) => {
513 eprintln!("Error converting db_name: {}", e);
514 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
515 }
516 };
517 let rust_table_name = match jstring_to_rust_string(&mut env, &table_name) {
518 Ok(s) => s,
519 Err(e) => {
520 eprintln!("Error converting table_name: {}", e);
521 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
522 }
523 };
524
525 match RUNTIME.block_on(cloud_sink_parquet(&rust_db_name, &rust_table_name)) {
526 Ok(result) => {
527 let json_string = result.to_string();
528 return_jstring_or_fallback(&mut env, &json_string)
529 }
530 Err(err) => {
531 let err_message = format!("Failed sink parquet files: {:?}", err);
532 return_jstring_or_fallback(&mut env, &err_message)
533 }
534 }
535 }
536
537 #[no_mangle]
538 pub unsafe extern "C" fn nativeCloudFetchParquet(
539 mut env: JNIEnv,
540 _class: JClass,
541 username: JString,
542 db_name: JString,
543 table_name: JString,
544 date_range: JObject,
545 ) -> jstring {
546 let rust_username = match jstring_to_rust_string(&mut env, &username) {
547 Ok(s) => s,
548 Err(e) => {
549 eprintln!("Error converting username: {}", e);
550 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
551 }
552 };
553 let rust_db_name = match jstring_to_rust_string(&mut env, &db_name) {
554 Ok(s) => s,
555 Err(e) => {
556 eprintln!("Error converting db_name: {}", e);
557 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
558 }
559 };
560 let rust_table_name = match jstring_to_rust_string(&mut env, &table_name) {
561 Ok(s) => s,
562 Err(e) => {
563 eprintln!("Error converting table_name: {}", e);
564 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
565 }
566 };
567
568 let mut rust_date_range: HashMap<&str, &str> = HashMap::new();
569 let rust_start = match get_date_range_value(&mut env, &date_range, "start") {
570 Ok(s) => s,
571 Err(e) => {
572 eprintln!("Error getting start date: {}", e);
573 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
574 }
575 };
576 let rust_end = match get_date_range_value(&mut env, &date_range, "end") {
577 Ok(s) => s,
578 Err(e) => {
579 eprintln!("Error getting end date: {}", e);
580 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
581 }
582 };
583 rust_date_range.insert("start_date", &rust_start);
584 rust_date_range.insert("end_date", &rust_end);
585
586 match RUNTIME.block_on(cloud_fetch_parquet(&rust_username, &rust_db_name, &rust_table_name, rust_date_range)) {
587 Ok(result) => {
588 let json_string = result.to_string();
589 return_jstring_or_fallback(&mut env, &json_string)
590 }
591 Err(err) => {
592 let err_message = format!("Failed fetch s3 parquet files: {:?}", err);
593 return_jstring_or_fallback(&mut env, &err_message)
594 }
595 }
596 }
597
598 #[no_mangle]
599 pub unsafe extern "C" fn nativeCloudFetchParquetBatch(
600 mut env: JNIEnv,
601 _class: JClass,
602 usernames: JObject,
603 db_names: JObject,
604 table_names: JObject,
605 date_range: JObject,
606 ) -> jstring {
607 let mut rust_usernames: Vec<String> = Vec::new();
609 let mut rust_db_names: Vec<String> = Vec::new();
610 let mut rust_table_names: Vec<String> = Vec::new();
611
612 let usernames_array: jni::objects::JObjectArray = usernames.into();
614 let usernames_length = match env.get_array_length(&usernames_array) {
615 Ok(len) => len,
616 Err(e) => {
617 eprintln!("Error getting usernames array length: {:?}", e);
618 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "Failed to get usernames array length: {:?}"}}"#, e));
619 }
620 };
621 for i in 0..usernames_length {
622 let element = match env.get_object_array_element(&usernames_array, i) {
623 Ok(elem) => elem,
624 Err(e) => {
625 eprintln!("Error getting usernames array element at index {}: {:?}", i, e);
626 return return_jstring_or_fallback(
627 &mut env,
628 &format!(r#"{{"error": "Failed to get usernames array element at index {}: {:?}"}}"#, i, e),
629 );
630 }
631 };
632 let j_string: JString = element.into();
633 let rust_string = match jstring_to_rust_string(&mut env, &j_string) {
634 Ok(s) => s,
635 Err(e) => {
636 eprintln!("Error converting username at index {}: {}", i, e);
637 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "Failed to convert username at index {}: {}"}}"#, i, e));
638 }
639 };
640 rust_usernames.push(rust_string);
641 }
642
643 let db_names_array: jni::objects::JObjectArray = db_names.into();
645 let db_names_length = match env.get_array_length(&db_names_array) {
646 Ok(len) => len,
647 Err(e) => {
648 eprintln!("Error getting db_names array length: {:?}", e);
649 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "Failed to get db_names array length: {:?}"}}"#, e));
650 }
651 };
652 for i in 0..db_names_length {
653 let element = match env.get_object_array_element(&db_names_array, i) {
654 Ok(elem) => elem,
655 Err(e) => {
656 eprintln!("Error getting db_names array element at index {}: {:?}", i, e);
657 return return_jstring_or_fallback(
658 &mut env,
659 &format!(r#"{{"error": "Failed to get db_names array element at index {}: {:?}"}}"#, i, e),
660 );
661 }
662 };
663 let j_string: JString = element.into();
664 let rust_string = match jstring_to_rust_string(&mut env, &j_string) {
665 Ok(s) => s,
666 Err(e) => {
667 eprintln!("Error converting db_name at index {}: {}", i, e);
668 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "Failed to convert db_name at index {}: {}"}}"#, i, e));
669 }
670 };
671 rust_db_names.push(rust_string);
672 }
673
674 let table_names_array: jni::objects::JObjectArray = table_names.into();
676 let table_names_length = match env.get_array_length(&table_names_array) {
677 Ok(len) => len,
678 Err(e) => {
679 eprintln!("Error getting table_names array length: {:?}", e);
680 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "Failed to get table_names array length: {:?}"}}"#, e));
681 }
682 };
683 for i in 0..table_names_length {
684 let element = match env.get_object_array_element(&table_names_array, i) {
685 Ok(elem) => elem,
686 Err(e) => {
687 eprintln!("Error getting table_names array element at index {}: {:?}", i, e);
688 return return_jstring_or_fallback(
689 &mut env,
690 &format!(r#"{{"error": "Failed to get table_names array element at index {}: {:?}"}}"#, i, e),
691 );
692 }
693 };
694 let j_string: JString = element.into();
695 let rust_string = match jstring_to_rust_string(&mut env, &j_string) {
696 Ok(s) => s,
697 Err(e) => {
698 eprintln!("Error converting table_name at index {}: {}", i, e);
699 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "Failed to convert table_name at index {}: {}"}}"#, i, e));
700 }
701 };
702 rust_table_names.push(rust_string);
703 }
704
705 let mut rust_date_range: HashMap<&str, &str> = HashMap::new();
707 let rust_start = match get_date_range_value(&mut env, &date_range, "start") {
708 Ok(s) => s,
709 Err(e) => {
710 eprintln!("Error getting start date: {}", e);
711 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
712 }
713 };
714 let rust_end = match get_date_range_value(&mut env, &date_range, "end") {
715 Ok(s) => s,
716 Err(e) => {
717 eprintln!("Error getting end date: {}", e);
718 return return_jstring_or_fallback(&mut env, &format!(r#"{{"error": "{}"}}"#, e));
719 }
720 };
721 rust_date_range.insert("start_date", &rust_start);
722 rust_date_range.insert("end_date", &rust_end);
723
724 let usernames_refs: Vec<&str> = rust_usernames.iter().map(|s| s.as_str()).collect();
726 let db_names_refs: Vec<&str> = rust_db_names.iter().map(|s| s.as_str()).collect();
727 let table_names_refs: Vec<&str> = rust_table_names.iter().map(|s| s.as_str()).collect();
728
729 match RUNTIME.block_on(cloud_fetch_parquet_batch(
730 &usernames_refs,
731 &db_names_refs,
732 &table_names_refs,
733 rust_date_range,
734 )) {
735 Ok(result) => {
736 let json_string = result.to_string();
737 return_jstring_or_fallback(&mut env, &json_string)
738 }
739 Err(err) => {
740 let err_message = format!("Failed to batch fetch s3 parquet files: {:?}", err);
741 return_jstring_or_fallback(&mut env, &err_message)
742 }
743 }
744 }
745
746 #[no_mangle]
747 pub extern "C" fn JNI_OnLoad(vm: jni::JavaVM, _reserved: *mut std::ffi::c_void) -> jni::sys::jint {
748 let mut env = match vm.get_env() {
749 Ok(e) => e,
750 Err(e) => {
751 eprintln!("CRITICAL: Failed to get JNIEnv: {:?}", e);
752 return jni::sys::JNI_ERR;
753 }
754 };
755
756 let activity_thread = match env.find_class("android/app/ActivityThread") {
758 Ok(c) => c,
759 Err(e) => {
760 eprintln!("CRITICAL: Failed to find ActivityThread: {:?}", e);
761 return jni::sys::JNI_ERR;
762 }
763 };
764 let current_activity_thread = match env.call_static_method(activity_thread, "currentActivityThread", "()Landroid/app/ActivityThread;", &[]) {
765 Ok(result) => match result.l() {
766 Ok(obj) => obj,
767 Err(e) => {
768 eprintln!("CRITICAL: Failed to convert currentActivityThread to object: {:?}", e);
769 return jni::sys::JNI_ERR;
770 }
771 },
772 Err(e) => {
773 eprintln!("CRITICAL: Failed to get currentActivityThread: {:?}", e);
774 return jni::sys::JNI_ERR;
775 }
776 };
777 let app_context = match env.call_method(current_activity_thread, "getApplication", "()Landroid/app/Application;", &[]) {
778 Ok(result) => match result.l() {
779 Ok(obj) => obj,
780 Err(e) => {
781 eprintln!("CRITICAL: Failed to convert Application context to object: {:?}", e);
782 return jni::sys::JNI_ERR;
783 }
784 },
785 Err(e) => {
786 eprintln!("CRITICAL: Failed to get Application context: {:?}", e);
787 return jni::sys::JNI_ERR;
788 }
789 };
790
791 let package_name_obj = match env.call_method(app_context, "getPackageName", "()Ljava/lang/String;", &[]) {
793 Ok(result) => match result.l() {
794 Ok(obj) => obj,
795 Err(e) => {
796 eprintln!("CRITICAL: Failed to convert package name to object: {:?}", e);
797 return jni::sys::JNI_ERR;
798 }
799 },
800 Err(e) => {
801 eprintln!("CRITICAL: Failed to get package name: {:?}", e);
802 return jni::sys::JNI_ERR;
803 }
804 };
805 let package_name: String = match env.get_string(&JString::from(package_name_obj)) {
806 Ok(jstr) => jstr.into(),
807 Err(e) => {
808 eprintln!("CRITICAL: Failed to convert package name to Rust string: {:?}", e);
809 return jni::sys::JNI_ERR;
810 }
811 };
812
813 let class_name = format!("{}/TimonModule", package_name.replace(".", "/"));
815 let class = match env.find_class(&class_name) {
816 Ok(c) => c,
817 Err(e) => {
818 eprintln!("CRITICAL: Failed to find class '{}': {:?}", class_name, e);
819 return jni::sys::JNI_ERR;
820 }
821 };
822
823 let methods = [
824 NativeMethod {
825 name: "nativeInitTimon".into(),
826 sig: "(Ljava/lang/String;ILjava/lang/String;)Ljava/lang/String;".into(),
827 fn_ptr: nativeInitTimon as *mut c_void,
828 },
829 NativeMethod {
830 name: "nativeCreateDatabase".into(),
831 sig: "(Ljava/lang/String;)Ljava/lang/String;".into(),
832 fn_ptr: nativeCreateDatabase as *mut c_void,
833 },
834 NativeMethod {
835 name: "nativeCreateTable".into(),
836 sig: "(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;".into(),
837 fn_ptr: nativeCreateTable as *mut c_void,
838 },
839 NativeMethod {
840 name: "nativeListDatabases".into(),
841 sig: "()Ljava/lang/String;".into(),
842 fn_ptr: nativeListDatabases as *mut c_void,
843 },
844 NativeMethod {
845 name: "nativeListTables".into(),
846 sig: "(Ljava/lang/String;)Ljava/lang/String;".into(),
847 fn_ptr: nativeListTables as *mut c_void,
848 },
849 NativeMethod {
850 name: "nativeDeleteDatabase".into(),
851 sig: "(Ljava/lang/String;)Ljava/lang/String;".into(),
852 fn_ptr: nativeDeleteDatabase as *mut c_void,
853 },
854 NativeMethod {
855 name: "nativeDeleteTable".into(),
856 sig: "(Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;".into(),
857 fn_ptr: nativeDeleteTable as *mut c_void,
858 },
859 NativeMethod {
860 name: "nativeInsert".into(),
861 sig: "(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;".into(),
862 fn_ptr: nativeInsert as *mut c_void,
863 },
864 NativeMethod {
865 name: "nativeQuery".into(),
866 sig: "(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;I)Ljava/lang/String;".into(),
867 fn_ptr: nativeQuery as *mut c_void,
868 },
869 NativeMethod {
870 name: "nativeInitBucket".into(),
871 sig: "(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;".into(),
872 fn_ptr: nativeInitBucket as *mut c_void,
873 },
874 NativeMethod {
875 name: "nativeCloudSyncParquet".into(),
876 sig: "(Ljava/lang/String;Ljava/lang/String;Ljava/util/Map;Ljava/lang/String;)Ljava/lang/String;".into(),
877 fn_ptr: nativeCloudSyncParquet as *mut c_void,
878 },
879 NativeMethod {
880 name: "nativeCloudSinkParquet".into(),
881 sig: "(Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;".into(),
882 fn_ptr: nativeCloudSinkParquet as *mut c_void,
883 },
884 NativeMethod {
885 name: "nativeCloudFetchParquet".into(),
886 sig: "(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/util/Map;)Ljava/lang/String;".into(),
887 fn_ptr: nativeCloudFetchParquet as *mut c_void,
888 },
889 NativeMethod {
890 name: "nativeCloudFetchParquetBatch".into(),
891 sig: "([Ljava/lang/String;[Ljava/lang/String;[Ljava/lang/String;Ljava/util/Map;)Ljava/lang/String;".into(),
892 fn_ptr: nativeCloudFetchParquetBatch as *mut c_void,
893 },
894 ];
895
896 match env.register_native_methods(class, &methods) {
897 Ok(_) => jni::sys::JNI_VERSION_1_8,
898 Err(e) => {
899 eprintln!("CRITICAL: Failed to register native methods: {:?}", e);
900 jni::sys::JNI_ERR
901 }
902 }
903 }
904}
905
906#[cfg(target_os = "ios")]
907pub mod ios {
908 use crate::timon_engine::{
909 cloud_fetch_parquet, cloud_fetch_parquet_batch, cloud_sink_parquet, cloud_sync_parquet, create_database, create_table, delete_database,
910 delete_table, init_bucket, init_timon, insert, list_databases, list_tables, query,
911 };
912 use libc::c_char;
913 use std::collections::HashMap;
914 use std::ffi::{CStr, CString};
915 use std::sync::LazyLock;
916 use tokio::runtime::Runtime;
917
918 static RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
920 Runtime::new().unwrap_or_else(|e| {
921 eprintln!("CRITICAL: Failed to create tokio runtime for iOS interface: {:?}", e);
922 std::process::abort(); })
924 });
925
926 unsafe fn c_str_to_string(c_str: *const c_char) -> Result<String, String> {
928 if c_str.is_null() {
929 Err("Null pointer received".to_string())
930 } else {
931 CStr::from_ptr(c_str)
932 .to_str()
933 .map(|s| s.to_string())
934 .map_err(|e| format!("Failed to convert C string to Rust string: {:?}", e))
935 }
936 }
937
938 fn string_to_c_str(s: String) -> *mut c_char {
940 let safe = s.replace('\0', "");
941 CString::new(safe).expect("CString::new cannot fail after removing null bytes").into_raw()
942 }
943
944 #[no_mangle]
945 pub extern "C" fn rust_string_free(s: *mut c_char) {
946 if !s.is_null() {
947 unsafe {
948 CString::from_raw(s);
949 }
950 }
951 }
952
953 #[no_mangle]
954 pub extern "C" fn nativeInitTimon(storage_path: *const c_char, bucket_interval: u32, username: *const c_char) -> *mut c_char {
955 unsafe {
956 match (c_str_to_string(storage_path), c_str_to_string(username)) {
957 (Ok(rust_storage_path), Ok(rust_username)) => match init_timon(&rust_storage_path, bucket_interval, &rust_username) {
958 Ok(result) => {
959 let json_string = serde_json::to_string(&result).unwrap_or_else(|_| "{}".to_string());
960 string_to_c_str(json_string)
961 }
962 Err(err) => {
963 let err_message = serde_json::json!({ "error": format!("Failed to initialize Timon: {:?}", err) }).to_string();
964 string_to_c_str(err_message)
965 }
966 },
967 (Err(err), _) | (_, Err(err)) => {
968 let err_message = serde_json::json!({ "error": err }).to_string();
969 string_to_c_str(err_message)
970 }
971 }
972 }
973 }
974
975 #[no_mangle]
976 pub extern "C" fn nativeCreateDatabase(db_name: *const c_char) -> *mut c_char {
977 unsafe {
978 match c_str_to_string(db_name) {
979 Ok(rust_db_name) => match create_database(&rust_db_name) {
980 Ok(result) => {
981 let json_string = serde_json::to_string(&result).unwrap_or_else(|_| "{}".to_string());
982 string_to_c_str(json_string)
983 }
984 Err(err) => {
985 let err_message = serde_json::json!({ "error": format!("Failed to create database: {:?}", err) }).to_string();
986 string_to_c_str(err_message)
987 }
988 },
989 Err(err) => {
990 let err_message = serde_json::json!({ "error": err }).to_string();
991 string_to_c_str(err_message)
992 }
993 }
994 }
995 }
996
997 #[no_mangle]
998 pub extern "C" fn nativeCreateTable(db_name: *const c_char, table_name: *const c_char, schema: *const c_char) -> *mut c_char {
999 unsafe {
1000 match (c_str_to_string(db_name), c_str_to_string(table_name), c_str_to_string(schema)) {
1001 (Ok(rust_db_name), Ok(rust_table_name), Ok(rust_schema)) => match create_table(&rust_db_name, &rust_table_name, &rust_schema) {
1002 Ok(result) => {
1003 let json_string = serde_json::to_string(&result).unwrap_or_else(|_| "{}".to_string());
1004 string_to_c_str(json_string)
1005 }
1006 Err(err) => {
1007 let err_message = serde_json::json!({ "error": format!("Failed to create table: {:?}", err) }).to_string();
1008 string_to_c_str(err_message)
1009 }
1010 },
1011 (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => {
1012 let err_message = serde_json::json!({ "error": e }).to_string();
1013 string_to_c_str(err_message)
1014 }
1015 }
1016 }
1017 }
1018
1019 #[no_mangle]
1020 pub extern "C" fn nativeListDatabases() -> *mut c_char {
1021 match list_databases() {
1022 Ok(result) => {
1023 let json_string = serde_json::to_string(&result).unwrap_or_else(|_| "[]".to_string());
1024 string_to_c_str(json_string)
1025 }
1026 Err(err) => {
1027 let err_message = serde_json::json!({ "error": format!("Failed to list databases: {:?}", err) }).to_string();
1028 string_to_c_str(err_message)
1029 }
1030 }
1031 }
1032
1033 #[no_mangle]
1034 pub extern "C" fn nativeListTables(db_name: *const c_char) -> *mut c_char {
1035 unsafe {
1036 match c_str_to_string(db_name) {
1037 Ok(rust_db_name) => match list_tables(&rust_db_name) {
1038 Ok(result) => {
1039 let json_string = serde_json::to_string(&result).unwrap_or_else(|_| "[]".to_string());
1040 string_to_c_str(json_string)
1041 }
1042 Err(err) => {
1043 let err_message = serde_json::json!({ "error": format!("Failed to list tables: {:?}", err) }).to_string();
1044 string_to_c_str(err_message)
1045 }
1046 },
1047 Err(err) => {
1048 let err_message = serde_json::json!({ "error": err }).to_string();
1049 string_to_c_str(err_message)
1050 }
1051 }
1052 }
1053 }
1054
1055 #[no_mangle]
1056 pub extern "C" fn nativeDeleteDatabase(db_name: *const c_char) -> *mut c_char {
1057 unsafe {
1058 match c_str_to_string(db_name) {
1059 Ok(rust_db_name) => match delete_database(&rust_db_name) {
1060 Ok(result) => {
1061 let json_string = serde_json::to_string(&result).unwrap_or_else(|_| "{}".to_string());
1062 string_to_c_str(json_string)
1063 }
1064 Err(err) => {
1065 let err_message = serde_json::json!({ "error": format!("Failed to delete database: {:?}", err) }).to_string();
1066 string_to_c_str(err_message)
1067 }
1068 },
1069 Err(err) => {
1070 let err_message = serde_json::json!({ "error": err }).to_string();
1071 string_to_c_str(err_message)
1072 }
1073 }
1074 }
1075 }
1076
1077 #[no_mangle]
1078 pub extern "C" fn nativeDeleteTable(db_name: *const c_char, table_name: *const c_char) -> *mut c_char {
1079 unsafe {
1080 match (c_str_to_string(db_name), c_str_to_string(table_name)) {
1081 (Ok(rust_db_name), Ok(rust_table_name)) => match delete_table(&rust_db_name, &rust_table_name) {
1082 Ok(result) => {
1083 let json_string = serde_json::to_string(&result).unwrap_or_else(|_| "{}".to_string());
1084 string_to_c_str(json_string)
1085 }
1086 Err(err) => {
1087 let err_message = serde_json::json!({ "error": format!("Failed to delete table: {:?}", err) }).to_string();
1088 string_to_c_str(err_message)
1089 }
1090 },
1091 (Err(e), _) | (_, Err(e)) => {
1092 let err_message = serde_json::json!({ "error": e }).to_string();
1093 string_to_c_str(err_message)
1094 }
1095 }
1096 }
1097 }
1098
1099 #[no_mangle]
1100 pub extern "C" fn nativeInsert(db_name: *const c_char, table_name: *const c_char, json_data: *const c_char) -> *mut c_char {
1101 unsafe {
1102 match (c_str_to_string(db_name), c_str_to_string(table_name), c_str_to_string(json_data)) {
1103 (Ok(rust_db_name), Ok(rust_table_name), Ok(rust_json_data)) => match insert(&rust_db_name, &rust_table_name, &rust_json_data) {
1104 Ok(result) => {
1105 let json_string = serde_json::to_string(&result).unwrap_or_else(|_| "{}".to_string());
1106 string_to_c_str(json_string)
1107 }
1108 Err(err) => {
1109 let err_message = serde_json::json!({ "error": format!("Error writing JSON data to Parquet file: {:?}", err) }).to_string();
1110 string_to_c_str(err_message)
1111 }
1112 },
1113 _ => {
1114 let err_message = serde_json::json!({ "error": "Invalid arguments" }).to_string();
1115 string_to_c_str(err_message)
1116 }
1117 }
1118 }
1119 }
1120
1121 #[no_mangle]
1122 pub extern "C" fn nativeQuery(db_name: *const c_char, sql_query: *const c_char, username: *const c_char, limit_partitions: i32) -> *mut c_char {
1123 unsafe {
1124 match (c_str_to_string(db_name), c_str_to_string(sql_query), c_str_to_string(username).ok()) {
1125 (Ok(rust_db_name), Ok(rust_sql_query), rust_username) => {
1126 let rust_limit_partitions = if limit_partitions > 0 { Some(limit_partitions as usize) } else { None };
1128
1129 match RUNTIME.block_on(query(&rust_db_name, &rust_sql_query, rust_username.as_deref(), rust_limit_partitions)) {
1130 Ok(result) => {
1131 let json_string = serde_json::to_string(&result).unwrap_or_else(|_| "[]".to_string());
1132 string_to_c_str(json_string)
1133 }
1134 Err(err) => {
1135 let err_message = serde_json::json!({ "error": format!("Error querying Parquet files: {:?}", err) }).to_string();
1136 string_to_c_str(err_message)
1137 }
1138 }
1139 }
1140 _ => {
1141 let err_message = serde_json::json!({ "error": "Invalid arguments" }).to_string();
1142 string_to_c_str(err_message)
1143 }
1144 }
1145 }
1146 }
1147
1148 #[no_mangle]
1150 pub extern "C" fn nativeInitBucket(
1151 bucket_endpoint: *const c_char,
1152 bucket_name: *const c_char,
1153 access_key_id: *const c_char,
1154 secret_access_key: *const c_char,
1155 bucket_region: *const c_char,
1156 ) -> *mut c_char {
1157 unsafe {
1158 match (
1159 c_str_to_string(bucket_endpoint),
1160 c_str_to_string(bucket_name),
1161 c_str_to_string(access_key_id),
1162 c_str_to_string(secret_access_key),
1163 c_str_to_string(bucket_region),
1164 ) {
1165 (Ok(rust_bucket_endpoint), Ok(rust_bucket_name), Ok(rust_access_key_id), Ok(rust_secret_access_key), Ok(rust_bucket_region)) => {
1166 match init_bucket(
1167 &rust_bucket_endpoint,
1168 &rust_bucket_name,
1169 &rust_access_key_id,
1170 &rust_secret_access_key,
1171 &rust_bucket_region,
1172 ) {
1173 Ok(result) => {
1174 let json_string = serde_json::to_string(&result).unwrap_or_else(|_| "{}".to_string());
1175 string_to_c_str(json_string)
1176 }
1177 Err(err) => {
1178 let err_message = serde_json::json!({ "error": format!("Failed to initialize S3 bucket: {:?}", err) }).to_string();
1179 string_to_c_str(err_message)
1180 }
1181 }
1182 }
1183 _ => {
1184 let err_message = serde_json::json!({ "error": "Invalid arguments" }).to_string();
1185 string_to_c_str(err_message)
1186 }
1187 }
1188 }
1189 }
1190
1191 #[no_mangle]
1192 pub extern "C" fn nativeCloudSyncParquet(
1193 db_name: *const c_char,
1194 table_name: *const c_char,
1195 date_range_json: *const c_char,
1196 username: *const c_char,
1197 ) -> *mut c_char {
1198 unsafe {
1199 match (
1200 c_str_to_string(db_name),
1201 c_str_to_string(table_name),
1202 c_str_to_string(date_range_json),
1203 c_str_to_string(username).ok(),
1204 ) {
1205 (Ok(rust_db_name), Ok(rust_table_name), Ok(rust_date_range_json), rust_username) => {
1206 let rust_date_range: HashMap<String, String> = match serde_json::from_str(&rust_date_range_json) {
1208 Ok(map) => map,
1209 Err(e) => {
1210 let err_message = serde_json::json!({
1211 "error": format!("Failed to parse date_range_json: {}. Input was: '{}'", e, rust_date_range_json)
1212 })
1213 .to_string();
1214 return string_to_c_str(err_message);
1215 }
1216 };
1217
1218 let start_date = rust_date_range.get("start").cloned().unwrap_or_else(|| {
1219 println!("Warning: 'start' key not found in date_range_json, using default");
1220 "1970-01-01".to_string()
1221 });
1222
1223 let end_date = rust_date_range.get("end").cloned().unwrap_or_else(|| {
1224 println!("Warning: 'end' key not found in date_range_json, using default");
1225 "1970-01-02".to_string()
1226 });
1227
1228 let mut date_range_map = HashMap::new();
1229 date_range_map.insert("start_date", start_date.as_str());
1230 date_range_map.insert("end_date", end_date.as_str());
1231
1232 match RUNTIME.block_on(cloud_sync_parquet(
1233 &rust_db_name,
1234 &rust_table_name,
1235 date_range_map,
1236 rust_username.as_deref(),
1237 )) {
1238 Ok(result) => {
1239 let json_string = serde_json::to_string(&result).unwrap_or_else(|_| "{}".to_string());
1240 string_to_c_str(json_string)
1241 }
1242 Err(err) => {
1243 let err_message = serde_json::json!({ "error": format!("Error syncing parquet files: {}", err) }).to_string();
1244 string_to_c_str(err_message)
1245 }
1246 }
1247 }
1248 _ => {
1249 let err_message = serde_json::json!({
1250 "error": "Invalid arguments to nativeCloudSyncParquet function. Ensure all parameters are valid strings."
1251 })
1252 .to_string();
1253 string_to_c_str(err_message)
1254 }
1255 }
1256 }
1257 }
1258
1259 #[no_mangle]
1260 pub extern "C" fn nativeCloudSinkParquet(db_name: *const c_char, table_name: *const c_char) -> *mut c_char {
1261 unsafe {
1262 match (c_str_to_string(db_name), c_str_to_string(table_name)) {
1263 (Ok(rust_db_name), Ok(rust_table_name)) => match RUNTIME.block_on(cloud_sink_parquet(&rust_db_name, &rust_table_name)) {
1264 Ok(result) => {
1265 let json_string = serde_json::to_string(&result).unwrap_or_else(|_| "{}".to_string());
1266 string_to_c_str(json_string)
1267 }
1268 Err(err) => {
1269 let err_message = serde_json::json!({ "error": format!("Failed to sink Parquet files: {:?}", err) }).to_string();
1270 string_to_c_str(err_message)
1271 }
1272 },
1273 _ => {
1274 let err_message = serde_json::json!({ "error": "Invalid arguments" }).to_string();
1275 string_to_c_str(err_message)
1276 }
1277 }
1278 }
1279 }
1280
1281 #[no_mangle]
1282 pub extern "C" fn nativeCloudFetchParquet(
1283 username: *const c_char,
1284 db_name: *const c_char,
1285 table_name: *const c_char,
1286 date_range_json: *const c_char,
1287 ) -> *mut c_char {
1288 unsafe {
1289 match (
1290 c_str_to_string(username),
1291 c_str_to_string(db_name),
1292 c_str_to_string(table_name),
1293 c_str_to_string(date_range_json),
1294 ) {
1295 (Ok(rust_username), Ok(rust_db_name), Ok(rust_table_name), Ok(rust_date_range_json)) => {
1296 let rust_date_range: HashMap<String, String> = match serde_json::from_str(&rust_date_range_json) {
1298 Ok(map) => map,
1299 Err(e) => {
1300 let err_message = serde_json::json!({
1301 "error": format!("Failed to parse date_range_json: {}. Input was: '{}'", e, rust_date_range_json)
1302 })
1303 .to_string();
1304 return string_to_c_str(err_message);
1305 }
1306 };
1307
1308 let start_date = rust_date_range.get("start").cloned().unwrap_or_else(|| {
1309 println!("Warning: 'start' key not found in date_range_json, using default");
1310 "1970-01-01".to_string()
1311 });
1312
1313 let end_date = rust_date_range.get("end").cloned().unwrap_or_else(|| {
1314 println!("Warning: 'end' key not found in date_range_json, using default");
1315 "1970-01-02".to_string()
1316 });
1317
1318 let mut date_range_map = HashMap::new();
1319 date_range_map.insert("start_date", start_date.as_str());
1320 date_range_map.insert("end_date", end_date.as_str());
1321
1322 match RUNTIME.block_on(cloud_fetch_parquet(&rust_username, &rust_db_name, &rust_table_name, date_range_map)) {
1323 Ok(result) => {
1324 let json_string = serde_json::to_string(&result).unwrap_or_else(|_| "{}".to_string());
1325 string_to_c_str(json_string)
1326 }
1327 Err(err) => {
1328 let err_message = serde_json::json!({ "error": format!("Failed to fetch s3 Parquet files: {}", err) }).to_string();
1329 string_to_c_str(err_message)
1330 }
1331 }
1332 }
1333 _ => {
1334 let err_message = serde_json::json!({
1335 "error": "Invalid arguments to nativeCloudFetchParquet function. Ensure all parameters are valid strings."
1336 })
1337 .to_string();
1338 string_to_c_str(err_message)
1339 }
1340 }
1341 }
1342 }
1343
1344 #[no_mangle]
1345 pub extern "C" fn nativeCloudFetchParquetBatch(
1346 usernames_json: *const c_char,
1347 db_names_json: *const c_char,
1348 table_names_json: *const c_char,
1349 date_range_json: *const c_char,
1350 ) -> *mut c_char {
1351 unsafe {
1352 match (
1353 c_str_to_string(usernames_json),
1354 c_str_to_string(db_names_json),
1355 c_str_to_string(table_names_json),
1356 c_str_to_string(date_range_json),
1357 ) {
1358 (Ok(rust_usernames_json), Ok(rust_db_names_json), Ok(rust_table_names_json), Ok(rust_date_range_json)) => {
1359 let rust_usernames: Vec<String> = match serde_json::from_str(&rust_usernames_json) {
1361 Ok(vec) => vec,
1362 Err(e) => {
1363 let err_message = serde_json::json!({
1364 "error": format!("Failed to parse usernames_json: {}. Input was: '{}'", e, rust_usernames_json)
1365 })
1366 .to_string();
1367 return string_to_c_str(err_message);
1368 }
1369 };
1370
1371 let rust_db_names: Vec<String> = match serde_json::from_str(&rust_db_names_json) {
1372 Ok(vec) => vec,
1373 Err(e) => {
1374 let err_message = serde_json::json!({
1375 "error": format!("Failed to parse db_names_json: {}. Input was: '{}'", e, rust_db_names_json)
1376 })
1377 .to_string();
1378 return string_to_c_str(err_message);
1379 }
1380 };
1381
1382 let rust_table_names: Vec<String> = match serde_json::from_str(&rust_table_names_json) {
1383 Ok(vec) => vec,
1384 Err(e) => {
1385 let err_message = serde_json::json!({
1386 "error": format!("Failed to parse table_names_json: {}. Input was: '{}'", e, rust_table_names_json)
1387 })
1388 .to_string();
1389 return string_to_c_str(err_message);
1390 }
1391 };
1392
1393 let rust_date_range: HashMap<String, String> = match serde_json::from_str(&rust_date_range_json) {
1395 Ok(map) => map,
1396 Err(e) => {
1397 let err_message = serde_json::json!({
1398 "error": format!("Failed to parse date_range_json: {}. Input was: '{}'", e, rust_date_range_json)
1399 })
1400 .to_string();
1401 return string_to_c_str(err_message);
1402 }
1403 };
1404
1405 let start_date = rust_date_range.get("start").cloned().unwrap_or_else(|| {
1406 println!("Warning: 'start' key not found in date_range_json, using default");
1407 "1970-01-01".to_string()
1408 });
1409
1410 let end_date = rust_date_range.get("end").cloned().unwrap_or_else(|| {
1411 println!("Warning: 'end' key not found in date_range_json, using default");
1412 "1970-01-02".to_string()
1413 });
1414
1415 let mut date_range_map = HashMap::new();
1416 date_range_map.insert("start_date", start_date.as_str());
1417 date_range_map.insert("end_date", end_date.as_str());
1418
1419 let usernames_refs: Vec<&str> = rust_usernames.iter().map(|s| s.as_str()).collect();
1421 let db_names_refs: Vec<&str> = rust_db_names.iter().map(|s| s.as_str()).collect();
1422 let table_names_refs: Vec<&str> = rust_table_names.iter().map(|s| s.as_str()).collect();
1423
1424 match RUNTIME.block_on(cloud_fetch_parquet_batch(
1425 &usernames_refs,
1426 &db_names_refs,
1427 &table_names_refs,
1428 date_range_map,
1429 )) {
1430 Ok(result) => {
1431 let json_string = serde_json::to_string(&result).unwrap_or_else(|_| "{}".to_string());
1432 string_to_c_str(json_string)
1433 }
1434 Err(err) => {
1435 let err_message = serde_json::json!({ "error": format!("Failed to batch fetch s3 Parquet files: {}", err) }).to_string();
1436 string_to_c_str(err_message)
1437 }
1438 }
1439 }
1440 _ => {
1441 let err_message = serde_json::json!({
1442 "error": "Invalid arguments to nativeCloudFetchParquetBatch function. Ensure all parameters are valid strings."
1443 })
1444 .to_string();
1445 string_to_c_str(err_message)
1446 }
1447 }
1448 }
1449 }
1450}