use super::types::{FfiStr, SendMutPtr};
use super::vtable_gen::{make_snapshot_stream, SendFfiSnapshotIteratorResponse};
use super::vtables::SnapshotFetcherVtable;
pub struct FfiSnapshotFetcherProxy {
pub(crate) vtable: *const SnapshotFetcherVtable,
}
unsafe impl Send for FfiSnapshotFetcherProxy {}
unsafe impl Sync for FfiSnapshotFetcherProxy {}
impl Drop for FfiSnapshotFetcherProxy {
fn drop(&mut self) {
if !self.vtable.is_null() {
let vtable = unsafe { &*self.vtable };
(vtable.drop_fn)(vtable.state);
}
}
}
#[async_trait::async_trait]
impl drasi_lib::SnapshotFetcher for FfiSnapshotFetcherProxy {
async fn fetch_snapshot(
&self,
query_id: &str,
) -> Result<
drasi_lib::queries::output_state::SnapshotStream,
drasi_lib::queries::output_state::FetchError,
> {
let vtable = unsafe { &*self.vtable };
let cb = vtable.fetch_snapshot_fn;
let state = SendMutPtr(vtable.state);
let query_id_owned = query_id.to_string();
let wrapped = tokio::task::spawn_blocking(move || {
let query_id_ffi = FfiStr::from_str(&query_id_owned);
SendFfiSnapshotIteratorResponse(cb(state.as_ptr(), query_id_ffi))
})
.await
.map_err(
|_| drasi_lib::queries::output_state::FetchError::NotRunning {
status: drasi_lib::ComponentStatus::Error,
},
)?;
let resp = wrapped.0;
let err_str = unsafe { resp.error.into_string() };
if !err_str.is_empty() {
log::error!("[FFI fetch_snapshot] host returned error: {err_str}");
return Err(drasi_lib::queries::output_state::FetchError::NotRunning {
status: drasi_lib::ComponentStatus::Error,
});
}
let as_of_sequence = resp.as_of_sequence;
let config_hash = resp.config_hash;
let stream = make_snapshot_stream(resp.iterator);
Ok(
drasi_lib::queries::output_state::SnapshotStream::from_stream(
stream,
as_of_sequence,
config_hash,
),
)
}
}