mf_core/actors/
extension_manager.rs1use ractor::{Actor, ActorRef, ActorProcessingErr};
6use std::sync::Arc;
7use tokio::sync::oneshot;
8
9use crate::{
10 debug::debug, error::ForgeResult, extension_manager::ExtensionManager,
11 extension::OpFn,
12};
13
14use mf_model::schema::Schema;
15use mf_state::plugin::Plugin;
16
17use super::ActorSystemResult;
18
19pub enum ExtensionMessage {
21 GetSchema { reply: oneshot::Sender<Arc<Schema>> },
23 GetPlugins { reply: oneshot::Sender<Vec<Arc<Plugin>>> },
25 GetOpFns { reply: oneshot::Sender<OpFn> },
27 ReloadExtensions { reply: oneshot::Sender<ForgeResult<()>> },
29}
30
31impl std::fmt::Debug for ExtensionMessage {
33 fn fmt(
34 &self,
35 f: &mut std::fmt::Formatter<'_>,
36 ) -> std::fmt::Result {
37 match self {
38 ExtensionMessage::GetSchema { .. } => {
39 write!(f, "GetSchema {{ .. }}")
40 },
41 ExtensionMessage::GetPlugins { .. } => {
42 write!(f, "GetPlugins {{ .. }}")
43 },
44 ExtensionMessage::GetOpFns { .. } => write!(f, "GetOpFns {{ .. }}"),
45 ExtensionMessage::ReloadExtensions { .. } => {
46 write!(f, "ReloadExtensions {{ .. }}")
47 },
48 }
49 }
50}
51
52pub struct ExtensionManagerActorState {
56 extension_manager: ExtensionManager,
58}
59
60pub struct ExtensionManagerActor;
62
63#[ractor::async_trait]
64impl Actor for ExtensionManagerActor {
65 type Msg = ExtensionMessage;
66 type State = ExtensionManagerActorState;
67 type Arguments = ExtensionManager;
68
69 async fn pre_start(
70 &self,
71 _myself: ActorRef<Self::Msg>,
72 extension_manager: Self::Arguments,
73 ) -> Result<Self::State, ActorProcessingErr> {
74 debug!("启动扩展管理Actor");
75
76 Ok(ExtensionManagerActorState { extension_manager })
77 }
78
79 async fn handle(
80 &self,
81 _myself: ActorRef<Self::Msg>,
82 message: Self::Msg,
83 state: &mut Self::State,
84 ) -> Result<(), ActorProcessingErr> {
85 match message {
86 ExtensionMessage::GetSchema { reply } => {
87 let schema = state.extension_manager.get_schema();
88 let _ = reply.send(schema);
89 },
90
91 ExtensionMessage::GetPlugins { reply } => {
92 let plugins = state.extension_manager.get_plugins().clone();
93 let _ = reply.send(plugins);
94 },
95
96 ExtensionMessage::GetOpFns { reply } => {
97 let op_fns = state.extension_manager.get_op_fns().clone();
98 let _ = reply.send(op_fns);
99 },
100
101 ExtensionMessage::ReloadExtensions { reply } => {
102 let _ = reply.send(Ok(()));
105 },
106 }
107
108 Ok(())
109 }
110
111 async fn post_stop(
112 &self,
113 _myself: ActorRef<Self::Msg>,
114 _state: &mut Self::State,
115 ) -> Result<(), ActorProcessingErr> {
116 debug!("停止扩展管理Actor");
117 Ok(())
118 }
119}
120
121pub struct ExtensionManagerActorManager;
123
124impl ExtensionManagerActorManager {
125 pub async fn start(
127 extension_manager: ExtensionManager
128 ) -> ActorSystemResult<ActorRef<ExtensionMessage>> {
129 let (actor_ref, _handle) = Actor::spawn(
130 Some("ExtensionManagerActor".to_string()),
131 ExtensionManagerActor,
132 extension_manager,
133 )
134 .await
135 .map_err(|e| super::ActorSystemError::ActorStartupFailed {
136 actor_name: "ExtensionManagerActor".to_string(),
137 source: e,
138 })?;
139
140 debug!("扩展管理Actor启动成功");
141 Ok(actor_ref)
142 }
143}