pdk-classy 1.9.1-alpha.2

PDK Classy
Documentation
// Copyright (c) 2026, Salesforce, Inc.,
// All rights reserved.
// For full license text, see the LICENSE.txt file

use std::{any::Any, cell::RefCell, collections::BTreeMap, rc::Rc, task::Waker};

use crate::{
    client::HttpCallResponse,
    event::EventKind,
    grpc::transport::{GrpcCallId, GrpcResponseParts},
    types::{Cid, HttpCid, RequestId, RootCid},
};

use super::http::HttpReactor;

// HTTP Client Support
pub type ResponseContent = Box<dyn Any>;
pub type BoxedExtractor = Box<dyn FnOnce(&HttpCallResponse) -> Box<dyn Any>>;

#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct TickerId(usize);

impl PartialOrd for TickerId {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.0.cmp(&other.0))
    }
}

impl Ord for TickerId {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        self.0.cmp(&other.0)
    }
}

struct TickerIdGenerator {
    last_id: TickerId,
}

impl TickerIdGenerator {
    fn new() -> Self {
        Self {
            last_id: TickerId(0),
        }
    }

    fn generate(&mut self) -> TickerId {
        let last_id = self.last_id;
        self.last_id = TickerId(last_id.0 + 1);
        last_id
    }
}

struct RawRootReactor {
    context_id: RootCid,
    active_cid: Cid,
    context_create_waker: Option<Waker>,
    new_http_reactor: Option<Rc<HttpReactor>>,
    ticker_id_generator: TickerIdGenerator,
    tickers: BTreeMap<TickerId, (Waker, bool)>,
    http_reactors: BTreeMap<HttpCid, Rc<HttpReactor>>,

    // HTTP Client Support
    extractors: BTreeMap<RequestId, BoxedExtractor>,
    clients: BTreeMap<RequestId, Waker>,
    responses: BTreeMap<RequestId, (HttpCallResponse, Option<ResponseContent>)>,

    // Grpc Client Support
    grpc_clients: BTreeMap<GrpcCallId, Waker>,
    grpc_responses: BTreeMap<GrpcCallId, GrpcResponseParts>,

    done: bool,
}

impl RawRootReactor {
    fn context_id(&self) -> RootCid {
        self.context_id
    }

    fn current_event(&self) -> Option<EventKind> {
        match self.active_cid {
            Cid::Http(id) => self.http_reactors.get(&id).map(|r| r.current_event()),
            _ => None,
        }
    }

    fn active_cid(&self) -> Cid {
        self.active_cid
    }

    fn set_active_cid(&mut self, active_cid: Cid) {
        self.active_cid = active_cid;
    }

    fn done(&self) -> bool {
        self.done
    }

    fn set_done(&mut self) {
        self.done = true;
    }

    fn notify_response(&mut self, response: HttpCallResponse) {
        let request_id = response.request_id;
        let content = self.extractors.remove(&request_id).map(|e| e(&response));
        self.responses.insert(request_id, (response, content));
        if let Some(client) = self.clients.get(&request_id) {
            client.wake_by_ref();
        }
    }

    fn insert_create_waker(&mut self, waker: Waker) {
        self.context_create_waker = Some(waker);
    }

    fn take_create_waker(&mut self) -> Option<Waker> {
        self.context_create_waker.take()
    }

    fn set_paused(&self, cid: Cid, paused: bool) {
        match cid {
            Cid::Root(id) => {
                if id != self.context_id {
                    log::warn!("Trying to set pausing state from an invalid Context Reactor with id = {id:?}");
                }
            }
            Cid::Http(id) => {
                if let Some(reactor) = self.http_reactors.get(&id) {
                    reactor.set_paused(paused)
                } else {
                    log::warn!(
                        "Trying to set pausing state from a missing Http Reactor with id = {id:?}"
                    );
                }
            }
        }
    }

    fn set_http_context_done(&mut self, context_id: HttpCid) {
        self.http_reactors.remove(&context_id);
        self.set_active_cid(Cid::Root(self.context_id));
    }

    fn create_http_context(&mut self, context_id: HttpCid) -> Option<Rc<HttpReactor>> {
        let new_http_reactor = Rc::new(HttpReactor::new(context_id));
        self.new_http_reactor = Some(new_http_reactor.clone());
        self.http_reactors
            .insert(context_id, new_http_reactor.clone());

        let waker = self.context_create_waker.as_ref()?;

        waker.wake_by_ref();

        Some(new_http_reactor)
    }

    fn take_new_http_reactor(&mut self) -> Option<Rc<HttpReactor>> {
        self.new_http_reactor.take()
    }

    fn insert_client(&mut self, request_id: RequestId, waker: Waker) {
        self.clients.insert(request_id, waker);
    }

    fn insert_extractor(&mut self, request_id: RequestId, extractor: BoxedExtractor) {
        self.extractors.insert(request_id, extractor);
    }

    fn remove_extractor(&mut self, request_id: RequestId) -> Option<BoxedExtractor> {
        self.extractors.remove(&request_id)
    }

    fn remove_client(&mut self, request_id: RequestId) -> Option<Waker> {
        self.clients.remove(&request_id)
    }

    fn take_response(
        &mut self,
        request_id: RequestId,
    ) -> Option<(HttpCallResponse, Option<ResponseContent>)> {
        self.responses.remove(&request_id)
    }

    /// GRPC support
    fn notify_grpc_call_response(&mut self, grpc_response: GrpcResponseParts) {
        let call_id = grpc_response.event.call_id;
        self.grpc_responses.insert(call_id, grpc_response);
        if let Some(client) = self.grpc_clients.get(&call_id) {
            client.wake_by_ref();
        }
    }

    fn insert_grpc_client(&mut self, call_id: GrpcCallId, waker: Waker) {
        self.grpc_clients.insert(call_id, waker);
    }

    fn remove_grpc_client(&mut self, call_id: GrpcCallId) -> Option<Waker> {
        self.grpc_clients.remove(&call_id)
    }

    fn take_grpc_response(&mut self, request_id: GrpcCallId) -> Option<GrpcResponseParts> {
        self.grpc_responses.remove(&request_id)
    }

    /// Tick support
    fn insert_ticker(&mut self, ticker: Waker) -> TickerId {
        let id = self.ticker_id_generator.generate();
        self.tickers.insert(id, (ticker, false));
        id
    }

    fn consume_tick(&mut self, id: TickerId, new_waker: &Waker) -> bool {
        let Some((waker, available)) = self.tickers.get_mut(&id) else {
            return false;
        };

        if !waker.will_wake(new_waker) {
            waker.clone_from(new_waker);
        }

        let result = *available;
        *available = false;

        result
    }

    fn remove_ticker(&mut self, id: TickerId) -> Option<(Waker, bool)> {
        self.tickers.remove(&id)
    }

    fn notify_tick(&mut self) -> bool {
        self.tickers.iter_mut().for_each(|(_, (waker, ready))| {
            *ready = true;
            waker.wake_by_ref();
        });
        !self.tickers.is_empty()
    }
}

pub struct RootReactor {
    raw: RefCell<RawRootReactor>,
}

impl RootReactor {
    pub fn new(context_id: RootCid) -> Self {
        Self {
            raw: RefCell::new(RawRootReactor {
                context_id,
                active_cid: Cid::Root(context_id),
                context_create_waker: None,
                new_http_reactor: None,
                ticker_id_generator: TickerIdGenerator::new(),
                tickers: BTreeMap::new(),
                http_reactors: BTreeMap::new(),
                extractors: BTreeMap::new(),
                clients: BTreeMap::new(),
                grpc_responses: BTreeMap::new(),
                grpc_clients: BTreeMap::new(),
                responses: BTreeMap::new(),
                done: false,
            }),
        }
    }

    pub fn active_cid(&self) -> Cid {
        self.raw.borrow().active_cid()
    }

    pub fn set_active_cid(&self, active_cid: Cid) {
        self.raw.borrow_mut().set_active_cid(active_cid);
    }

    pub fn create_http_context(&self, context_id: HttpCid) -> Option<Rc<HttpReactor>> {
        self.raw.borrow_mut().create_http_context(context_id)
    }

    pub fn context_id(&self) -> RootCid {
        self.raw.borrow().context_id()
    }

    pub fn current_event(&self) -> Option<EventKind> {
        self.raw.borrow().current_event()
    }

    pub fn insert_create_waker(&self, waker: Waker) {
        self.raw.borrow_mut().insert_create_waker(waker);
    }

    pub fn take_new_http_reactor(&self) -> Option<Rc<HttpReactor>> {
        self.raw.borrow_mut().take_new_http_reactor()
    }

    pub fn take_create_waker(&self) -> Option<Waker> {
        self.raw.borrow_mut().take_create_waker()
    }

    pub fn set_paused(&self, cid: Cid, paused: bool) {
        self.raw.borrow_mut().set_paused(cid, paused);
    }

    pub fn set_http_context_done(&self, context_id: HttpCid) {
        self.raw.borrow_mut().set_http_context_done(context_id);
    }

    pub fn done(&self) -> bool {
        self.raw.borrow().done()
    }

    pub fn set_done(&self) {
        self.raw.borrow_mut().set_done();
    }

    pub fn notify_response(&self, response: HttpCallResponse) {
        self.raw.borrow_mut().notify_response(response)
    }

    pub fn insert_client(&self, request_id: RequestId, waker: Waker) {
        self.raw.borrow_mut().insert_client(request_id, waker);
    }

    pub fn insert_extractor(&self, request_id: RequestId, extractor: BoxedExtractor) {
        self.raw
            .borrow_mut()
            .insert_extractor(request_id, extractor);
    }

    pub fn remove_client(&self, request_id: RequestId) -> Option<Waker> {
        self.raw.borrow_mut().remove_client(request_id)
    }

    pub fn remove_extractor(&self, request_id: RequestId) -> Option<BoxedExtractor> {
        self.raw.borrow_mut().remove_extractor(request_id)
    }

    pub fn remove_response(
        &self,
        request_id: RequestId,
    ) -> Option<(HttpCallResponse, Option<ResponseContent>)> {
        self.raw.borrow_mut().take_response(request_id)
    }

    pub fn insert_ticker(&self, ticker: Waker) -> TickerId {
        self.raw.borrow_mut().insert_ticker(ticker)
    }

    pub fn consume_tick(&self, id: TickerId, new_waker: &Waker) -> bool {
        self.raw.borrow_mut().consume_tick(id, new_waker)
    }

    pub fn remove_ticker(&self, id: TickerId) -> Option<(Waker, bool)> {
        self.raw.borrow_mut().remove_ticker(id)
    }

    pub fn notify_tick(&self) -> bool {
        self.raw.borrow_mut().notify_tick()
    }

    pub fn notify_grpc_call_response(&self, grpc_response: GrpcResponseParts) {
        self.raw
            .borrow_mut()
            .notify_grpc_call_response(grpc_response)
    }

    pub fn insert_grpc_client(&self, call_id: GrpcCallId, waker: Waker) {
        self.raw.borrow_mut().insert_grpc_client(call_id, waker);
    }

    pub fn remove_grpc_client(&self, call_id: GrpcCallId) -> Option<Waker> {
        self.raw.borrow_mut().remove_grpc_client(call_id)
    }

    pub fn remove_grpc_response(&self, call_id: GrpcCallId) -> Option<GrpcResponseParts> {
        self.raw.borrow_mut().take_grpc_response(call_id)
    }
}