use chrono::Utc;
use libloading::{Library, Symbol};
use crate::context::Context;
use crate::error::TaskError;
use crate::registry::loader::package_loader::get_library_extension;
use crate::task::{Task, TaskNamespace};
#[derive(Debug)]
pub(super) struct DynamicLibraryTask {
library_data: Vec<u8>,
task_name: String,
package_name: String,
dependencies: Vec<TaskNamespace>,
}
impl DynamicLibraryTask {
pub(super) fn new(
library_data: Vec<u8>,
task_name: String,
package_name: String,
dependencies: Vec<TaskNamespace>,
) -> Self {
Self {
library_data,
task_name,
package_name,
dependencies,
}
}
}
#[async_trait::async_trait]
impl Task for DynamicLibraryTask {
async fn execute(
&self,
context: Context<serde_json::Value>,
) -> Result<Context<serde_json::Value>, TaskError> {
let library_extension = get_library_extension();
let temp_dir = tempfile::TempDir::new().map_err(|e| TaskError::ExecutionFailed {
task_id: self.task_name.clone(),
message: format!(
"Failed to create temp directory for package '{}': {}",
self.package_name, e
),
timestamp: Utc::now(),
})?;
let temp_path = temp_dir
.path()
.join(format!("task_exec.{}", library_extension));
std::fs::write(&temp_path, &self.library_data).map_err(|e| TaskError::ExecutionFailed {
task_id: self.task_name.clone(),
message: format!("Failed to write library to temp file: {}", e),
timestamp: Utc::now(),
})?;
let lib = unsafe {
Library::new(&temp_path).map_err(|e| TaskError::ExecutionFailed {
task_id: self.task_name.clone(),
message: format!(
"Failed to load library for package '{}' at {}: {}",
self.package_name,
temp_path.display(),
e
),
timestamp: Utc::now(),
})?
};
let execute_task_symbol = b"cloacina_execute_task";
let execute_task: Symbol<
unsafe extern "C" fn(
task_name: *const std::os::raw::c_char,
task_name_len: u32,
context_json: *const std::os::raw::c_char,
context_len: u32,
result_buffer: *mut u8,
result_capacity: u32,
result_len: *mut u32,
) -> i32,
> = unsafe {
lib.get(execute_task_symbol)
.map_err(|e| TaskError::ExecutionFailed {
task_id: self.task_name.clone(),
message: format!(
"Symbol 'cloacina_execute_task' not found in library for package '{}': {}",
self.package_name, e
),
timestamp: Utc::now(),
})?
};
let task_name_cstring = std::ffi::CString::new(self.task_name.clone()).map_err(|e| {
TaskError::ExecutionFailed {
task_id: self.task_name.clone(),
message: format!("Invalid task name: {}", e),
timestamp: Utc::now(),
}
})?;
let context_json =
serde_json::to_string(context.data()).map_err(|e| TaskError::ValidationFailed {
message: format!(
"Failed to serialize context for task {}: {}",
self.task_name, e
),
})?;
tracing::debug!("Task '{}' input context: {}", self.task_name, context_json);
eprintln!(
"DEBUG: Task '{}' input context: {}",
self.task_name, context_json
);
let context_cstring =
std::ffi::CString::new(context_json).map_err(|e| TaskError::ExecutionFailed {
task_id: self.task_name.clone(),
message: format!("Invalid context JSON: {}", e),
timestamp: Utc::now(),
})?;
let mut result_buffer = vec![0u8; 10 * 1024 * 1024]; let mut result_len = 0u32;
let return_code = unsafe {
execute_task(
task_name_cstring.as_ptr(),
task_name_cstring.as_bytes().len() as u32,
context_cstring.as_ptr(),
context_cstring.as_bytes().len() as u32,
result_buffer.as_mut_ptr(),
result_buffer.len() as u32,
&mut result_len,
)
};
if return_code == 0 {
let mut result_context = context;
if result_len > 0 {
if result_len > result_buffer.len() as u32 {
return Err(TaskError::ExecutionFailed {
task_id: self.task_name.clone(),
message: format!(
"Task execution result too large: {} bytes exceeds maximum buffer size of {} bytes. \
This indicates the task context has grown beyond the database storage limit.",
result_len,
result_buffer.len()
),
timestamp: Utc::now(),
});
}
result_buffer.truncate(result_len as usize);
let result_str =
String::from_utf8(result_buffer).map_err(|e| TaskError::ExecutionFailed {
task_id: self.task_name.clone(),
message: format!("Invalid UTF-8 in result: {}", e),
timestamp: Utc::now(),
})?;
tracing::debug!("Task '{}' output result: {}", self.task_name, result_str);
eprintln!(
"DEBUG: Task '{}' output result: {}",
self.task_name, result_str
);
let result_value: serde_json::Value =
serde_json::from_str(&result_str).map_err(|e| TaskError::ValidationFailed {
message: format!(
"Invalid JSON in result for task {}: {}",
self.task_name, e
),
})?;
if let serde_json::Value::Object(obj) = result_value {
for (key, value) in obj {
if result_context.get(&key).is_some() {
result_context.update(key, value).map_err(|e| {
TaskError::ExecutionFailed {
task_id: self.task_name.clone(),
message: format!("Failed to update result: {}", e),
timestamp: Utc::now(),
}
})?;
} else {
result_context.insert(key, value).map_err(|e| {
TaskError::ExecutionFailed {
task_id: self.task_name.clone(),
message: format!("Failed to insert result: {}", e),
timestamp: Utc::now(),
}
})?;
}
}
}
}
Ok(result_context)
} else {
let error_msg = if result_len > 0 {
if result_len > result_buffer.len() as u32 {
format!(
"Task execution failed (code: {}) with oversized error message: {} bytes exceeds buffer size of {} bytes",
return_code, result_len, result_buffer.len()
)
} else {
result_buffer.truncate(result_len as usize);
String::from_utf8_lossy(&result_buffer).to_string()
}
} else {
format!("Task execution failed with code {}", return_code)
};
Err(TaskError::ExecutionFailed {
task_id: self.task_name.clone(),
message: error_msg,
timestamp: Utc::now(),
})
}
}
fn id(&self) -> &str {
&self.task_name
}
fn dependencies(&self) -> &[TaskNamespace] {
&self.dependencies
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_dynamic_library_task_creation() {
let task = DynamicLibraryTask::new(
vec![0x7f, 0x45, 0x4c, 0x46], "test_task".to_string(),
"test_package".to_string(),
Vec::new(), );
assert_eq!(task.id(), "test_task");
assert_eq!(task.dependencies().len(), 0); }
}