torc 0.22.1

Workflow management system
use super::*;
use crate::server::api::ComputeNodesApi;

#[allow(clippy::too_many_arguments)]
impl<C> Server<C>
where
    C: Has<XSpanIdString> + Has<Option<Authorization>> + Send + Sync,
{
    pub(super) async fn transport_create_compute_node(
        &self,
        body: models::ComputeNodeModel,
        context: &C,
    ) -> Result<CreateComputeNodeResponse, ApiError> {
        authorize_workflow!(self, body.workflow_id, context, CreateComputeNodeResponse);

        let result = self
            .compute_nodes_api
            .create_compute_node(body.clone(), context)
            .await?;

        if let CreateComputeNodeResponse::SuccessfulResponse(ref created) = result {
            self.event_broadcaster.broadcast(BroadcastEvent {
                workflow_id: body.workflow_id,
                timestamp: chrono::Utc::now().timestamp_millis(),
                event_type: "compute_node_started".to_string(),
                severity: models::EventSeverity::Info,
                data: serde_json::json!({
                    "compute_node_id": created.id,
                    "hostname": body.hostname,
                    "pid": body.pid,
                    "num_cpus": body.num_cpus,
                    "memory_gb": body.memory_gb,
                    "num_gpus": body.num_gpus,
                    "compute_node_type": body.compute_node_type,
                }),
            });
        }

        Ok(result)
    }
    pub(super) async fn transport_delete_compute_nodes(
        &self,
        workflow_id: i64,
        context: &C,
    ) -> Result<DeleteComputeNodesResponse, ApiError> {
        authorize_workflow!(self, workflow_id, context, DeleteComputeNodesResponse);
        self.compute_nodes_api
            .delete_compute_nodes(workflow_id, context)
            .await
    }
    pub(super) async fn transport_list_compute_nodes(
        &self,
        workflow_id: i64,
        offset: Option<i64>,
        limit: Option<i64>,
        sort_by: Option<String>,
        reverse_sort: Option<bool>,
        hostname: Option<String>,
        is_active: Option<bool>,
        scheduled_compute_node_id: Option<i64>,
        context: &C,
    ) -> Result<ListComputeNodesResponse, ApiError> {
        authorize_workflow!(self, workflow_id, context, ListComputeNodesResponse);
        let (offset, limit) = process_pagination_params(offset, limit)?;
        self.compute_nodes_api
            .list_compute_nodes(
                workflow_id,
                offset,
                limit,
                sort_by,
                reverse_sort,
                hostname,
                is_active,
                scheduled_compute_node_id,
                context,
            )
            .await
    }
    pub(super) async fn transport_get_compute_node(
        &self,
        id: i64,
        context: &C,
    ) -> Result<GetComputeNodeResponse, ApiError> {
        authorize_resource!(self, id, "compute_node", context, GetComputeNodeResponse);
        self.compute_nodes_api.get_compute_node(id, context).await
    }
    pub(super) async fn transport_update_compute_node(
        &self,
        id: i64,
        body: models::ComputeNodeModel,
        context: &C,
    ) -> Result<UpdateComputeNodeResponse, ApiError> {
        authorize_resource!(self, id, "compute_node", context, UpdateComputeNodeResponse);
        let result = self
            .compute_nodes_api
            .update_compute_node(id, body.clone(), context)
            .await?;
        if let UpdateComputeNodeResponse::SuccessfulResponse(ref _updated) = result
            && body.is_active == Some(false)
        {
            self.event_broadcaster.broadcast(BroadcastEvent {
                workflow_id: body.workflow_id,
                timestamp: chrono::Utc::now().timestamp_millis(),
                event_type: "compute_node_stopped".to_string(),
                severity: models::EventSeverity::Info,
                data: serde_json::json!({
                    "compute_node_id": id,
                    "hostname": body.hostname,
                    "pid": body.pid,
                    "duration_seconds": body.duration_seconds,
                    "compute_node_type": body.compute_node_type,
                }),
            });
        }
        Ok(result)
    }
    pub(super) async fn transport_delete_compute_node(
        &self,
        id: i64,
        context: &C,
    ) -> Result<DeleteComputeNodeResponse, ApiError> {
        authorize_resource!(self, id, "compute_node", context, DeleteComputeNodeResponse);
        self.compute_nodes_api
            .delete_compute_node(id, context)
            .await
    }
}