use std::{
collections::{HashMap, HashSet},
str::FromStr,
sync::Mutex,
};
use cosmian_crypto_core::{
bytes_ser_de::Serializer, reexport::rand_core::SeedableRng, CsRng, FixedSizeCBytes,
RandomFixedSizeCBytes, SymmetricKey,
};
use cosmian_ffi_utils::{
error::{h_get_error, set_last_error, FfiError},
ffi_read_bytes, ffi_read_string, ffi_unwrap, ffi_write_bytes, ErrorCode,
};
use cosmian_findex::{
Data, Error as FindexError, IndexedValue, IndexedValueToKeywordsMap, Keyword, Keywords, Label,
USER_KEY_LENGTH,
};
use lazy_static::lazy_static;
use tracing::trace;
#[cfg(debug_assertions)]
use crate::logger::log_init;
use crate::{
db_interfaces::{
custom::ffi::{
Delete, DumpTokens, Fetch, FfiCallbacks, FilterObsoleteData, Insert, Interrupt, Upsert,
},
rest::{AuthorizationToken, CallbackPrefix},
DbInterfaceError,
},
ser_de::ffi_ser_de::{
deserialize_data_set, deserialize_indexed_values, deserialize_keyword_set,
get_upsert_output_size, serialize_data_set, serialize_intermediate_results,
serialize_keyword_set,
},
Configuration, InstantiatedFindex,
};
lazy_static! {
static ref FINDEX_INSTANCES: Mutex::<HashMap::<i32, (SymmetricKey<USER_KEY_LENGTH>, Label, InstantiatedFindex)>> =
Mutex::new(HashMap::new());
}
#[no_mangle]
#[tracing::instrument(ret, skip_all)]
pub unsafe extern "C" fn h_instantiate_with_custom_interface(
findex_handle: *mut i32,
key_ptr: *const u8,
key_len: i32,
label_ptr: *const i8,
entry_table_number: u32,
fetch_entry: Fetch,
fetch_chain: Fetch,
upsert_entry: Upsert,
insert_entry: Insert,
insert_chain: Insert,
delete_entry: Delete,
delete_chain: Delete,
dump_tokens: DumpTokens,
) -> i32 {
#[cfg(debug_assertions)]
log_init();
let key_bytes = ffi_read_bytes!("key", key_ptr, key_len);
let key = ffi_unwrap!(
SymmetricKey::try_from_slice(key_bytes),
"error deserializing findex key",
ErrorCode::Serialization
);
trace!("Key successfully parsed");
let label_bytes = ffi_read_string!("label", label_ptr);
let label = Label::from(label_bytes.as_str());
trace!("Label successfully parsed: label: {label}");
let config = Configuration::Ffi(
FfiCallbacks {
table_number: entry_table_number as usize,
fetch: Some(fetch_entry),
upsert: Some(upsert_entry),
insert: Some(insert_entry),
delete: Some(delete_entry),
dump_tokens: Some(dump_tokens),
},
FfiCallbacks {
table_number: 1,
fetch: Some(fetch_chain),
upsert: None,
insert: Some(insert_chain),
delete: Some(delete_chain),
dump_tokens: None,
},
);
let rt = ffi_unwrap!(
tokio::runtime::Runtime::new(),
"error creating Tokio runtime",
ErrorCode::Tokio
);
let findex = ffi_unwrap!(
rt.block_on(InstantiatedFindex::new(config)),
"error instantiating Findex with custom backend",
ErrorCode::Findex
);
let mut cache = FINDEX_INSTANCES
.lock()
.expect("Findex instance cache lock poisoned.");
let handle = ffi_unwrap!(
<i32>::try_from(cache.len()),
"findex instance cache capacity overflow",
ErrorCode::Findex
);
cache.insert(handle, (key, label, findex));
*findex_handle = handle;
ErrorCode::Success.into()
}
#[no_mangle]
#[tracing::instrument(ret, skip_all)]
pub unsafe extern "C" fn h_instantiate_with_rest_interface(
findex_handle: *mut i32,
label_ptr: *const i8,
token_ptr: *const i8,
entry_url_ptr: *const i8,
chain_url_ptr: *const i8,
) -> i32 {
#[cfg(debug_assertions)]
log_init();
let label_bytes = ffi_read_string!("label", label_ptr);
let label = Label::from(label_bytes.as_str());
trace!("Label successfully parsed: label: {label}");
let token = ffi_read_string!("token", token_ptr);
trace!("Authorization token read: {token}");
let authorization_token = ffi_unwrap!(
crate::db_interfaces::rest::AuthorizationToken::from_str(&token),
"authorization token conversion failed",
ErrorCode::Backend
);
let entry_url = if entry_url_ptr.is_null() {
String::new()
} else {
ffi_read_string!("REST server Entry Table URL", entry_url_ptr)
};
let chain_url = if chain_url_ptr.is_null() {
String::new()
} else {
ffi_read_string!("REST server Chain Table URL", chain_url_ptr)
};
let config = Configuration::Rest(authorization_token.clone(), entry_url, chain_url);
let rt = ffi_unwrap!(
tokio::runtime::Runtime::new(),
"error creating Tokio runtime",
ErrorCode::Tokio
);
let findex = ffi_unwrap!(
rt.block_on(InstantiatedFindex::new(config)),
"error instantiating Findex with REST backend",
ErrorCode::Backend
);
let mut cache = FINDEX_INSTANCES
.lock()
.expect("Findex instance cache lock poisoned.");
let handle = ffi_unwrap!(
<i32>::try_from(cache.len()),
"findex instance cache capacity overflow",
ErrorCode::Findex
);
cache.insert(handle, (authorization_token.findex_key, label, findex));
*findex_handle = handle;
ErrorCode::Success.into()
}
#[no_mangle]
#[tracing::instrument(ret, skip_all)]
pub unsafe extern "C" fn h_instantiate_with_redis_interface(
findex_handle: *mut i32,
key_ptr: *const u8,
key_len: i32,
label_ptr: *const i8,
entry_table_redis_url_ptr: *const i8,
chain_table_redis_url_ptr: *const i8,
) -> i32 {
#[cfg(debug_assertions)]
log_init();
let key_bytes = ffi_read_bytes!("key", key_ptr, key_len);
let key = ffi_unwrap!(
SymmetricKey::try_from_slice(key_bytes),
"error deserializing findex key",
ErrorCode::Serialization
);
trace!("Key successfully parsed");
let label_bytes = ffi_read_string!("label", label_ptr);
let label = Label::from(label_bytes.as_str());
trace!("Label successfully parsed: label: {label}");
let entry_table_redis_url =
ffi_read_string!("Redis entry table URL", entry_table_redis_url_ptr);
let chain_table_redis_url =
ffi_read_string!("Redis chain table URL", chain_table_redis_url_ptr);
let config = Configuration::Redis(entry_table_redis_url, chain_table_redis_url);
let rt = ffi_unwrap!(
tokio::runtime::Runtime::new(),
"error creating Tokio runtime",
ErrorCode::Tokio
);
let findex = ffi_unwrap!(
rt.block_on(InstantiatedFindex::new(config)),
"error instantiating Findex with REST backend",
ErrorCode::Findex
);
let mut cache = FINDEX_INSTANCES
.lock()
.expect("Findex instance cache lock poisoned.");
let handle = ffi_unwrap!(
<i32>::try_from(cache.len()),
"findex instance cache capacity overflow",
ErrorCode::Findex
);
cache.insert(handle, (key, label, findex));
*findex_handle = handle;
ErrorCode::Success.into()
}
#[no_mangle]
#[tracing::instrument(ret, skip_all)]
pub unsafe extern "C" fn h_search(
results_ptr: *mut u8,
results_len: *mut i32,
findex_handle: i32,
keywords_ptr: *const u8,
keywords_len: i32,
interrupt: Interrupt,
) -> i32 {
#[cfg(debug_assertions)]
log_init();
let keywords = ffi_unwrap!(
deserialize_keyword_set(ffi_read_bytes!("keywords", keywords_ptr, keywords_len)),
"error deserializing keywords",
ErrorCode::Serialization
);
let keywords = Keywords::from(keywords);
trace!("Keywords successfully parsed: keywords: {keywords}");
let user_interrupt = |res: HashMap<Keyword, HashSet<IndexedValue<Keyword, Data>>>| async move {
trace!("user interrupt input: {res:?}");
let bytes = serialize_intermediate_results(&res).map_err(|e| e.to_string())?;
let length = <u32>::try_from(bytes.len()).map_err(|e| e.to_string())?;
let is_interrupted = 1 == (interrupt)(bytes.as_ptr(), length);
trace!("user interrupt output: = {is_interrupted}");
Ok(is_interrupted)
};
let cache = FINDEX_INSTANCES
.lock()
.expect("Findex instance cache lock poisoned.");
let (key, label, findex) = ffi_unwrap!(
cache
.get(&findex_handle)
.ok_or_else(|| format!("no matching instance for handle {findex_handle}")),
"cannot get a hold on the Findex instance",
ErrorCode::Findex
);
let rt = ffi_unwrap!(
tokio::runtime::Runtime::new(),
"error creating Tokio runtime",
ErrorCode::Tokio
);
let res = rt.block_on(findex.search(key, label, keywords, &user_interrupt));
let results = match res {
Ok(res) => res,
Err(FindexError::DbInterface(DbInterfaceError::Ffi(msg, code))) => {
set_last_error(FfiError::Generic(format!(
"backend error during `search` operation: {msg}"
)));
return code.into();
}
Err(e) => {
set_last_error(FfiError::Generic(format!("findex `search` error: {e}")));
return ErrorCode::Findex.into();
}
};
let mut serializer = Serializer::new();
ffi_unwrap!(
serializer.write_leb128_u64(results.len() as u64),
"error serializing length",
ErrorCode::Serialization
);
for (keyword, data) in results {
ffi_unwrap!(
serializer.write_vec(&keyword),
"error serializing keyword",
ErrorCode::Serialization
);
let serialized_data = ffi_unwrap!(
serialize_data_set(&data),
"error serializing set",
ErrorCode::Serialization
);
ffi_unwrap!(
serializer.write_array(&serialized_data),
"error serializing data",
ErrorCode::Serialization
);
}
let serialized_uids = serializer.finalize();
ffi_write_bytes!("search results", &serialized_uids, results_ptr, results_len);
}
#[no_mangle]
#[tracing::instrument(ret, skip_all)]
pub unsafe extern "C" fn h_add(
results_ptr: *mut u8,
results_len: *mut i32,
findex_handle: i32,
associations_ptr: *const u8,
associations_len: i32,
) -> i32 {
#[cfg(debug_assertions)]
log_init();
let associations_bytes = ffi_read_bytes!("associations", associations_ptr, associations_len);
let associations = IndexedValueToKeywordsMap::from(ffi_unwrap!(
deserialize_indexed_values(associations_bytes),
"failed deserialize indexed values (associations)",
ErrorCode::Serialization
));
let output_size = get_upsert_output_size(&associations);
if *results_len < output_size as i32 {
set_last_error(FfiError::Generic(format!(
"The pre-allocated add results buffer is too small; need {} bytes, allocated {}",
output_size, results_len as i32
)));
*results_len = output_size as i32;
return ErrorCode::BufferTooSmall.into();
}
let cache = FINDEX_INSTANCES
.lock()
.expect("Findex instance cache lock poisoned.");
let (key, label, findex) = ffi_unwrap!(
cache
.get(&findex_handle)
.ok_or_else(|| format!("no matching instance for handle {findex_handle}")),
"cannot get a hold on the Findex instance",
ErrorCode::Findex
);
trace!("instantiated Findex: {findex:?}");
let rt = ffi_unwrap!(
tokio::runtime::Runtime::new(),
"error creating Tokio runtime",
ErrorCode::Tokio
);
let res = rt.block_on(findex.add(key, label, associations));
let new_keywords = match res {
Ok(new_keywords) => new_keywords,
Err(FindexError::DbInterface(DbInterfaceError::Ffi(msg, code))) => {
set_last_error(FfiError::Generic(format!(
"backend error during `add` operation: {msg}"
)));
return code.into();
}
Err(e) => {
set_last_error(FfiError::Generic(format!("findex `add` error: {e}")));
return ErrorCode::Findex.into();
}
};
let serialized_keywords = ffi_unwrap!(
serialize_keyword_set(&new_keywords),
"serialize new keywords",
ErrorCode::Serialization
);
ffi_write_bytes!(
"add results",
&serialized_keywords,
results_ptr,
results_len
);
}
#[no_mangle]
#[tracing::instrument(ret, skip_all)]
pub unsafe extern "C" fn h_delete(
results_ptr: *mut u8,
results_len: *mut i32,
findex_handle: i32,
associations_ptr: *const u8,
associations_len: i32,
) -> i32 {
#[cfg(debug_assertions)]
log_init();
let associations_bytes = ffi_read_bytes!("associations", associations_ptr, associations_len);
let associations = IndexedValueToKeywordsMap::from(ffi_unwrap!(
deserialize_indexed_values(associations_bytes),
"failed deserialize indexed values (associations)",
ErrorCode::Serialization
));
let output_size = get_upsert_output_size(&associations);
if *results_len < output_size as i32 {
set_last_error(FfiError::Generic(format!(
"The pre-allocated add results buffer is too small; need {} bytes, allocated {}",
output_size, results_len as i32
)));
*results_len = output_size as i32;
return ErrorCode::BufferTooSmall.into();
}
let cache = FINDEX_INSTANCES
.lock()
.expect("Findex instance cache lock poisoned.");
let (key, label, findex) = ffi_unwrap!(
cache
.get(&findex_handle)
.ok_or_else(|| format!("no matching instance for handle {findex_handle}")),
"cannot get a hold on the Findex instance",
ErrorCode::Findex
);
trace!("instantiated Findex: {findex:?}");
let rt = ffi_unwrap!(
tokio::runtime::Runtime::new(),
"error creating Tokio runtime",
ErrorCode::Tokio
);
let res = rt.block_on(findex.delete(key, label, associations));
let new_keywords = match res {
Ok(new_keywords) => new_keywords,
Err(FindexError::DbInterface(DbInterfaceError::Ffi(msg, code))) => {
set_last_error(FfiError::Generic(format!(
"backend error during `delete` operation: {msg}"
)));
return code.into();
}
Err(e) => {
set_last_error(FfiError::Generic(format!("findex `delete` error: {e}")));
return ErrorCode::Findex.into();
}
};
let serialized_keywords = ffi_unwrap!(
serialize_keyword_set(&new_keywords),
"serialize new keywords",
ErrorCode::Serialization
);
ffi_write_bytes!(
"delete results",
&serialized_keywords,
results_ptr,
results_len
);
}
#[no_mangle]
#[tracing::instrument(ret, skip_all)]
pub unsafe extern "C" fn h_compact(
findex_handle: i32,
new_key_ptr: *const u8,
new_key_len: i32,
new_label_ptr: *const i8,
compacting_rate: f64,
filter_obsolete_data: FilterObsoleteData,
) -> i32 {
#[cfg(debug_assertions)]
log_init();
let filter = |data: HashSet<Data>| async {
let moved_data = data;
let bytes =
serialize_data_set(&moved_data).map_err(|e| format!("error serializing data: {e}"))?;
let mut res = vec![0; bytes.len()];
let mut res_length = res.len() as u32;
let err = (filter_obsolete_data)(
res.as_mut_ptr(),
&mut res_length,
bytes.as_ptr(),
bytes.len() as u32,
);
if err != 0 {
set_last_error(FfiError::Generic(format!("filter callback error: {err}")));
return Err(String::from("Filter error."));
}
deserialize_data_set(&res).map_err(|e| format!("error deserializing filtered data: {e}"))
};
let new_key_bytes = ffi_read_bytes!("new key", new_key_ptr, new_key_len);
let new_key = ffi_unwrap!(
SymmetricKey::try_from_slice(new_key_bytes),
"error deserializing new findex key",
ErrorCode::Serialization
);
let new_label_bytes = ffi_read_string!("new label", new_label_ptr);
let new_label = Label::from(new_label_bytes.as_str());
let mut cache = FINDEX_INSTANCES
.lock()
.expect("Findex instance cache lock poisoned.");
let (old_key, old_label, findex) = ffi_unwrap!(
cache
.get_mut(&findex_handle)
.ok_or_else(|| format!("no matching instance for handle {findex_handle}")),
"cannot get a hold on the Findex instance",
ErrorCode::Findex
);
let rt = ffi_unwrap!(
tokio::runtime::Runtime::new(),
"error creating Tokio runtime",
ErrorCode::Tokio
);
trace!("instantiated Findex: {findex:?}");
let res = rt.block_on(findex.compact(
old_key,
&new_key,
old_label,
&new_label,
compacting_rate,
&filter,
));
match res {
Err(FindexError::DbInterface(DbInterfaceError::Ffi(msg, code))) => {
set_last_error(FfiError::Generic(format!(
"backend error during `compact` operation: {msg}"
)));
code.into()
}
Err(e) => {
set_last_error(FfiError::Generic(format!("findex `compact` error: {e}")));
ErrorCode::Findex.into()
}
Ok(()) => {
*old_key = new_key;
*old_label = new_label;
ErrorCode::Success.into()
}
}
}
#[no_mangle]
#[tracing::instrument(ret, skip_all)]
pub unsafe extern "C" fn h_generate_new_token(
token_ptr: *mut u8,
token_len: *mut i32,
index_id_ptr: *const i8,
fetch_entries_seed_ptr: *const u8,
fetch_entries_seed_len: i32,
fetch_chains_seed_ptr: *const u8,
fetch_chains_seed_len: i32,
upsert_entries_seed_ptr: *const u8,
upsert_entries_seed_len: i32,
insert_chains_seed_ptr: *const u8,
insert_chains_seed_len: i32,
) -> i32 {
#[cfg(debug_assertions)]
log_init();
let index_id: String = ffi_read_string!("index id", index_id_ptr);
let fetch_entries_seed = ffi_read_bytes!(
"fetch_entries_seed",
fetch_entries_seed_ptr,
fetch_entries_seed_len
);
let fetch_chains_seed = ffi_read_bytes!(
"fetch_chains_seed",
fetch_chains_seed_ptr,
fetch_chains_seed_len
);
let upsert_entries_seed = ffi_read_bytes!(
"upsert_entries_seed",
upsert_entries_seed_ptr,
upsert_entries_seed_len
);
let insert_chains_seed = ffi_read_bytes!(
"insert_chains_seed",
insert_chains_seed_ptr,
insert_chains_seed_len
);
let mut seeds = HashMap::new();
seeds.insert(
CallbackPrefix::FetchEntry,
ffi_unwrap!(
SymmetricKey::try_from_slice(fetch_entries_seed),
"fetch_entries_seed is of wrong size",
ErrorCode::Serialization
),
);
seeds.insert(
CallbackPrefix::FetchChain,
ffi_unwrap!(
SymmetricKey::try_from_slice(fetch_chains_seed),
"fetch_chains_seed is of wrong size",
ErrorCode::Serialization
),
);
seeds.insert(
CallbackPrefix::Upsert,
ffi_unwrap!(
SymmetricKey::try_from_slice(upsert_entries_seed),
"upsert_entries_seed is of wrong size",
ErrorCode::Serialization
),
);
seeds.insert(
CallbackPrefix::Insert,
ffi_unwrap!(
SymmetricKey::try_from_slice(insert_chains_seed),
"insert_chains_seed is of wrong size",
ErrorCode::Serialization
),
);
let mut rng = CsRng::from_entropy();
let findex_key = SymmetricKey::new(&mut rng);
let token = ffi_unwrap!(
AuthorizationToken::new(index_id, findex_key, seeds),
"generate authorization token",
ErrorCode::Findex
);
ffi_write_bytes!(
"search results",
token.to_string().as_bytes(),
token_ptr,
token_len
);
}
#[no_mangle]
pub unsafe extern "C" fn get_last_error(error_ptr: *mut i8, error_len: *mut i32) -> i32 {
h_get_error(error_ptr, error_len)
}