orign 0.2.3

A globally distributed container orchestrator
Documentation
use crate::auth::agent::get_agent_token_root;
use crate::handlers::v1::humans::process_human_response;
use crate::resources::v1::humans::models::V1ApprovalResponse;
use crate::resources::v1::humans::models::V1FeedbackResponse;
use crate::resources::v1::humans::models::V1FeedbackResponseKind;
use serde::{Deserialize, Serialize};
use slack_morphism::errors::SlackClientError;
use slack_morphism::events::SlackInteractionEvent;
use slack_morphism::prelude::*;
use std::env;
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::debug;

// For HTTP calls
use sea_orm::DatabaseConnection;
use thiserror::Error;

#[derive(Debug, Error)]
pub enum MySlackAppError {
    #[error("Environment variable {0} not set or invalid")]
    EnvVar(String),
    #[error("Invalid interaction payload")]
    InvalidPayload,
    #[error("Network or request error: {0}")]
    RequestError(#[from] reqwest::Error),
    #[error("Slack error: {0}")]
    Slack(#[from] SlackClientError),
    #[error("IO error: {0}")]
    Io(#[from] std::io::Error),
    #[error("Other error: {0}")]
    Other(String),
}

type MyResult<T> = Result<T, MySlackAppError>;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct FeedbackMetadata {
    approved: bool,
    question: Option<String>,
    id: Option<String>,
    owner: Option<String>,
    orign_url: Option<String>,
    resource_ref: Option<String>,
    messages: Option<serde_json::Value>,
    images: Option<Vec<String>>,
    videos: Option<Vec<String>>,
}

/// This is the main logic that parallels your Python "handle_ml_yes_no_button".
/// You'll call this from your Slack interaction handler below.
async fn handle_interactions(
    interaction: SlackInteractionEvent,
    client: Arc<SlackHyperClient>,
    states: SlackClientEventsUserState,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    debug!("[Slack socket] handle_approval_interaction");
    debug!("[Slack socket] interaction: {:?}", interaction);

    // Early return for non-BlockActions events
    let block_actions = match interaction {
        SlackInteractionEvent::BlockActions(block_actions) => block_actions,
        _ => {
            return Err(Box::new(MySlackAppError::Other(
                "Unsupported interaction type".into(),
            )))
        }
    };

    // Now we can use block_actions directly
    let action = block_actions
        .actions
        .as_ref()
        .ok_or(MySlackAppError::InvalidPayload)?
        .get(0)
        .ok_or(MySlackAppError::InvalidPayload)?;

    debug!("[Slack socket] action: {:?}", action);

    // Check if this is a boolean action (matching our button action_ids)
    let action_id_str = action.action_id.to_string();
    if action_id_str != "boolean_action_yes" && action_id_str != "boolean_action_no" {
        debug!(
            "[Slack socket] Ignoring non-boolean action: {}",
            action_id_str
        );
        return Ok(());
    }

    // action.value is our JSON like: {"approved": true, "question": "...", ...}
    let value_str = action
        .value
        .as_ref()
        .ok_or(MySlackAppError::InvalidPayload)?;
    let metadata: FeedbackMetadata = serde_json::from_str(value_str)
        .map_err(|_| MySlackAppError::Other("Failed to parse JSON from action value".into()))?;

    debug!("[Slack socket] metadata: {:?}", metadata);

    // Basic validation
    let feedback_id = metadata
        .id
        .as_ref()
        .ok_or_else(|| MySlackAppError::Other("Feedback ID not found in metadata".into()))?;

    debug!("[Slack socket] feedback_id: {:?}", feedback_id);

    let owner = metadata
        .owner
        .as_ref()
        .ok_or_else(|| MySlackAppError::Other("Owner not found in metadata".into()))?;

    debug!("[Slack socket] owner: {:?}", owner);

    let ref_str = metadata
        .resource_ref
        .as_ref()
        .ok_or_else(|| MySlackAppError::Other("Resource ref not found in metadata".into()))?;

    debug!("[Slack socket] ref_str: {:?}", ref_str);

    let parts: Vec<&str> = ref_str.split('.').collect();
    if parts.len() != 3 {
        return Err(Box::new(MySlackAppError::Other(
            "Resource ref is not in the correct format".into(),
        )));
    }

    debug!("[Slack socket] parts: {:?}", parts);

    let name = parts[0];
    let namespace = parts[1];
    let _kind = parts[2];

    debug!("[Slack socket] name: {:?}", name);
    debug!("[Slack socket] namespace: {:?}", namespace);
    debug!("[Slack socket] kind: {:?}", _kind);

    let question_text = metadata
        .question
        .clone()
        .unwrap_or_else(|| "<no question>".to_string());

    // Create the approval response separately since the enum doesn't accept parameters
    let approval_response = V1ApprovalResponse {
        content: question_text.clone(),
        messages: metadata.messages.clone(),
        images: metadata.images.clone(),
        videos: metadata.videos.clone(),
    };

    let feedback_response = V1FeedbackResponse {
        kind: "approval".to_string(),
        response: Some(V1FeedbackResponseKind::V1ApprovalResponse(
            approval_response,
        )),
    };

    // Extract db_pool from states
    let state_storage = states.read().await;
    let db_pool = state_storage
        .get_user_state::<DatabaseConnection>()
        .ok_or_else(|| {
            Box::new(MySlackAppError::Other(
                "Could not get database connection".into(),
            ))
        })?;

    let owner_ids = vec![owner.to_string()];
    debug!("owner_ids: {:?}", owner_ids);

    let mut user: Option<&str> = None;
    let mut org: Option<&str> = None;
    if owner.contains("@") {
        user = Some(owner.as_str());
    } else {
        org = Some(owner.as_str());
    }

    debug!("getting agent token...");
    let orign_agent_key = match get_agent_token_root(
        "orign-slack-server",
        "orign-slack-server",
        org,
        Some("agent"),
        user,
    )
    .await
    {
        Ok(key) => key,
        Err(e) => {
            return Err(Box::new(MySlackAppError::Other(e.to_string())));
        }
    };

    debug!("got agent token {:?}", orign_agent_key);
    debug!("processing human response...");

    match process_human_response(
        &db_pool,
        &owner_ids,
        &orign_agent_key,
        &namespace,
        &name,
        feedback_id,
        feedback_response,
    )
    .await
    {
        Ok(_) => (),
        Err(e) => {
            return Err(Box::new(MySlackAppError::Other(format!(
                "Request failed: {:?}",
                e
            ))));
        }
    }
    // We'll update the Slack message:
    // Slack says to respond with a "response_action" : "update" or use chat.update
    // With Slack Morphism, we can do something like SlackApiChatUpdateRequest.
    debug!("[Slack socket] updating message...");

    let channel_id = &block_actions.channel.unwrap().id;
    let ts = &block_actions.message.unwrap().origin.ts;
    let user_response_text = if metadata.approved { "Yes" } else { "No" };

    // We can rebuild new blocks that substitute the actions block
    let new_text = format!(
        "The user responded *{}* for question: `{}`",
        user_response_text, question_text
    );

    let token = SlackApiToken::new(env_var("SLACK_BOT_TOKEN")?.into());
    let session = client.open_session(&token);

    session
        .chat_update(&SlackApiChatUpdateRequest::new(
            channel_id.clone(),
            SlackMessageContent::new() // <-- content
                .with_text(format!(
                    "[{}] The user chose {}.",
                    feedback_id, user_response_text
                ))
                .with_blocks(vec![SlackSectionBlock::new()
                    .with_text(md!(new_text))
                    .into()]),
            ts.clone(),
        ))
        .await?;

    debug!("[Slack socket] message updated");

    Ok(())
}

fn env_var(name: &str) -> MyResult<String> {
    env::var(name).map_err(|_| MySlackAppError::EnvVar(name.to_string()))
}

pub async fn run_slack_socket_server(
    db_pool: DatabaseConnection,
    mut shutdown_rx: broadcast::Receiver<()>,
) -> MyResult<()> {
    debug!("Starting Slack socket mode server");

    let client = Arc::new(SlackClient::new(SlackClientHyperConnector::new()?));

    let socket_mode_callbacks =
        SlackSocketModeListenerCallbacks::new().with_interaction_events(|event, client, states| {
            let fut = handle_interactions(event, client, states);
            Box::pin(async move { fut.await })
        });

    let listener_environment = Arc::new(
        SlackClientEventsListenerEnvironment::new(client.clone())
            .with_user_state(db_pool) // Pass the database connection to handlers
            .with_error_handler(|err, _cli, _states| {
                println!("[Slack socket] Slack error: {:?}", err);
                HttpStatusCode::OK
            }),
    );

    let socket_mode_listener = SlackClientSocketModeListener::new(
        &SlackClientSocketModeConfig::new(),
        listener_environment.clone(),
        socket_mode_callbacks,
    );

    let app_token_value: SlackApiTokenValue = env_var("SLACK_APP_TOKEN")?.into();
    let app_token: SlackApiToken = SlackApiToken::new(app_token_value);

    // Listen in the background with proper shutdown handling
    tokio::spawn(async move {
        if let Err(e) = socket_mode_listener.listen_for(&app_token).await {
            eprintln!("[Slack socket] Slack socket mode error: {:?}", e);
            return;
        }

        tokio::select! {
            _ = socket_mode_listener.serve() => {
                println!("[Slack socket] Slack socket mode server stopped");
            }
            _ = shutdown_rx.recv() => {
                println!("[Slack socket] Shutting down Slack socket mode server");
            }
        }
    });

    println!("[Slack socket] Slack socket mode server started");

    Ok(())
}